Как указать, какую тему служебной шины Azure использовать с MassTransit

Я пробовал использовать MassTransit для публикации сообщения в теме с именем events в служебной шине Azure. У меня проблемы с настройкой MassTransit для использования моей предопределенной темы events, вместо этого он создает новую тему, названную пространством имен / именем класса для типа сообщения. Поэтому мне интересно, как указать, какую тему использовать вместо создания новой.

Это код, с которым я тестировал:

using System;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.AzureServiceBusTransport;
using Microsoft.ServiceBus;

namespace PublisherNameSpace
{
    public class Publisher
    {
        public static async Task PublishMessage()
        {
            var topic = "events";
            var bus = Bus.Factory.CreateUsingAzureServiceBus(
                cfg =>
                {
                    var azureServiceBusHost = cfg.Host(new Uri("sb://<busname>.servicebus.windows.net"), host =>
                    {
                        host.OperationTimeout = TimeSpan.FromSeconds(5);
                        host.TokenProvider =
                            TokenProvider.CreateSharedAccessSignatureTokenProvider(
                                "RootManageSharedAccessKey",
                                "<key>"
                            );
                    });

                    cfg.ReceiveEndpoint(azureServiceBusHost, topic, e =>
                    {
                        e.Consumer<TestConsumer>();
                    });
                });

            await bus.Publish<TestConsumer>(new TestMessage { TestString = "testing" });
        }
    }

    public class TestConsumer : IConsumer<TestMessage>
    {
        public Task Consume(ConsumeContext<TestMessage> context)
        {
            return Console.Out.WriteAsync("Consuming message");
        }
    }

    public class TestMessage
    {
        public string TestString { get; set; }
    }
}

person OriginalUtter    schedule 22.03.2018    source источник


Ответы (3)


Если вы хотите использовать из определенной темы, создайте конечную точку подписки вместо конечной точки получения и укажите тему и имя подписки в конфигурации.

Самая простая форма показана в модульных тестах:

https://github.com/MassTransit/MassTransit/blob/develop/tests/MassTransit.Azure.ServiceBus.Core.Tests/Subscription_Specs.cs

person Chris Patterson    schedule 24.03.2018
comment
Спасибо, это должно сработать. Однако в одной из моих служб я ограничен MassTransit 3.3.5 (из-за старого Newtonsoft.Json), где метод SubscriptionEndpoint не представлен. Можете ли вы указать, какую тему использовать и в этой версии? - person OriginalUtter; 26.03.2018
comment
Эта версия не поддерживает. - person Chris Patterson; 26.03.2018
comment
Обновлено. Конечно, это снова изменится, когда выйдет v7. - person Chris Patterson; 26.05.2020
comment
Ссылка мертва ... - person Robotronx; 26.04.2021
comment
Спасибо, обновился. - person Chris Patterson; 26.04.2021

Принятый ответ проясняет сторону подписки:

cfg.SubscriptionEndpoint(
    host,
    "sub-1",
    "my-topic-1",
    e =>
    {
        e.ConfigureConsumer<TestConsumer>(provider);
    });

Для тех, кто задается вопросом, как получить конфигурацию шины прямо на стороне публикации, это должно выглядеть так:

cfg.Message<TestMessage>(x =>
{
    x.SetEntityName("my-topic-1");
});

Затем вы можете вызвать публикацию в автобусе:

await bus.Publish<TestMessage>(message);

Спасибо @ChrisPatterson за укажи мне на это!

person Simon Ness    schedule 26.08.2019
comment
Я не знаю, почему официальная документация не так ясна. Мне очень трудно понять. - person Robotronx; 26.04.2021

Мне удалось отправить в тему служебной шины Azure с помощью _sendEndpointProvider.GetSendEndpoint (новый Uri (topic: shape)); где ... shape - это имя темы.

public class MassTransitController : ControllerBase
{
    private readonly ILogger<MassTransitController> _logger;
    private readonly ISendEndpointProvider _sendEndpointProvider;

    public MassTransitController(ILogger<MassTransitController> logger, ISendEndpointProvider sendEndpointProvider)
    {
        _logger = logger;
        _sendEndpointProvider = sendEndpointProvider;
    }

    [HttpGet]
    public async Task<IActionResult> Get()
    {
        try
        {
            var randomType = new Random();
            var randomColor = new Random();

            var shape = new Shape();
            shape.ShapeId = Guid.NewGuid();
            shape.Color = ShapeType.ShapeColors[randomColor.Next(ShapeType.ShapeColors.Count)];
            shape.Type = ShapeType.ShapeTypes[randomType.Next(ShapeType.ShapeTypes.Count)];

            var endpoint = await _sendEndpointProvider.GetSendEndpoint(new Uri("topic:shape"));
            await endpoint.Send(shape);

            return Ok(shape);
        }
        catch (Exception ex)
        {
            throw ex;
        }
    }
}

Я также смог заставить .NET 5 Worker Consumer работать с таким кодом ... где суб-все подписки будет улавливать все формы ... Я собираюсь сделать репозиторий в блоге / git об этом.

    public static IHostBuilder CreateHostBuilder(string[] args) =>
        Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddMassTransit(x =>
                {
                    x.UsingAzureServiceBus((context, cfg) =>
                    {
                        cfg.Host("Endpoint=sb://******");

                        cfg.SubscriptionEndpoint(
                            "sub-all",
                            "shape",
                            e =>
                        {
                            e.Handler<Shape>(async context =>
                            {
                                await Console.Out.WriteLineAsync($"Shape Received: {context.Message.Type}");
                            });

                            e.MaxDeliveryCount = 15;
                        });
                    });
                });

                services.AddMassTransitHostedService();
            });
person mrjamiebowman    schedule 09.04.2021