MassTransit использует сообщение, отличное от MassTransit

У меня есть консольное приложение, которое публикует сообщения для обмена RabbitMQ. Может ли подписчик, созданный с помощью MassTransit, использовать это сообщение?

Это код издателя:

    public virtual void Send(LogEntryMessage message)
    {

        using (var connection = _factory.CreateConnection())
        using (var channel = connection.CreateModel())
        {
            var props = channel.CreateBasicProperties();
            props.CorrelationId = Guid.NewGuid().ToString();

            var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));

            channel.BasicPublish(exchange: _endpointConfiguration.Exchange, routingKey: _endpointConfiguration.RoutingKey, basicProperties: null,
                body: body);
        }
    }

Это код абонента:

      IBusControl ConfigureBus()
      {
        return Bus.Factory.CreateUsingRabbitMq(cfg =>
        {
            var host = cfg.Host(new Uri("rabbitmq://localhost"), h =>
            {
                h.Username(username);
                h.Password(password);
            });

            cfg.ReceiveEndpoint(host, "LogEntryQueue", e =>
            {
                e.Handler<LogEntryMessage>(context =>
                Console.Out.WriteLineAsync($"Value was entered: {context.Message.MessageBody}"));
            });
        });
    }

Это потребительский код:

    public class LogEntryMessageProcessor : IConsumer<LogEntryMessage>
    {
        public Task Consume(ConsumeContext<LogEntryMessage> context)
        {
            Console.Out.WriteLineAsync($"Value was entered: 
                      {context.Message.Message.MessageBody}");
            return Task.FromResult(0);
        }
    }

person cksanjose    schedule 15.06.2017    source источник


Ответы (2)


Я надеюсь, что вы сможете получить ответ в разделе Совместимость, в частности посмотрите на пример сообщения.

По сути, вам нужно создать объект JSON в соответствии с некоторыми простыми правилами.

Пример сообщения выглядит так:

{
    "destinationAddress": "rabbitmq://localhost/input_queue",
    "headers": {},
    "message": {
        "value": "Some Value",
        "customerId": 27
    },
    "messageType": [
        "urn:message:MassTransit.Tests:ValueMessage"
    ]
}

Вы можете легко проверить, как выглядят более сложные сообщения, создав и издателя, и потребителя, запустите программу для создания привязок, затем остановите потребителя и опубликуйте несколько сообщений. Они будут в очереди подписчиков, поэтому вы можете легко прочитать их с помощью подключаемого модуля управления.

person Alexey Zimarev    schedule 15.06.2017
comment
Спасибо за совет. Мне удалось опубликовать и подписаться, но обработчик сообщений не может сериализовать сообщение. Контекст сообщения MassTransit был создан и содержит объект сообщения, но все свойства сообщения равны нулю. - person cksanjose; 16.06.2017
comment
Если вы сможете составить образец репозитория на github, я, вероятно, найду время и посмотрю на него. - person Alexey Zimarev; 16.06.2017
comment
Спасибо! Любая помощь приветствуется! - person cksanjose; 17.06.2017
comment
Я создал github.com/cksanjose/masstransit_pubsub и github.com/cksanjose/masstransit_subscriber. Первый - это обычный издатель RabbitMQ. Подписчик - MassTransit и RabbitMQ. - person cksanjose; 19.06.2017
comment
Похоже на это github.com/cksanjose / masstransit_pubsub / blob / master / должен быть LogEntryPayload, и ваш потребитель должен использовать этот тип вместо внешнего типа, который, по сути, является вашей версией конверта сообщения MT. - person Chris Patterson; 19.06.2017
comment
Спасибо! Я тоже это понял. Я загрузил исходный код MT и смог отладить его. - person cksanjose; 19.06.2017

Чтобы MassTransit мог обработать сообщение, опубликованное клиентом, отличным от MassTransit, сообщение должно содержать метаданные, необходимые для MassTransit, как описано в Взаимодействие. Потребитель сообщения должен обработать полезную нагрузку сообщения. В приведенном ниже коде полезной нагрузкой является LogEntryPayload:

public class LogEntryMessageProcessor : IConsumer<LogEntryPayload>
{
    public Task Consume(ConsumeContext<LogEntryPayload> context)
    {
        //var payload = context.GetPayload<LogEntryPayload>();
        Console.Out.WriteLineAsync($"Value was entered: {context.Message.Id} - {context.Message.MessageBody}");
        return Task.FromResult(0);
    }
}
person cksanjose    schedule 19.06.2017