Единица работы при использовании MassTransit

Я ищу способ подключиться к конвейеру обработки сообщений и выполнить некоторую работу после того, как потребитель закончит обработку некоторого сообщения. Мое намерение состоит в том, чтобы открыть новый сеанс и начать транзакцию (можно выполнить контейнер IoC) перед обработкой и удалением их сразу после него.

В NServiceBus я бы использовал интерфейс IMessageModule для подключения. Есть ли что-то похожее на него? На самом деле удаление обработчика также может помочь, но поскольку я использую StructureMap в качестве ObjectBuilder, метод Release просто ничего не делает.


person Mouk    schedule 06.04.2011    source источник
comment
Обратите внимание, что если вы используете сообщения из очереди транзакций с помощью MSMQ, сообщения считываются в транзакции, и любые операции с базой данных с использованием NHibernate и т. д. могут участвовать в этой отдельной транзакции.   -  person Chris Patterson    schedule 06.04.2011


Ответы (2)


Вы можете зарегистрировать перехватчик, который будет вызываться до и после потребления каждого сообщения. Например:

        LocalBus = ServiceBusConfigurator.New(x =>
            {
                x.ReceiveFrom("loopback://localhost/mt_client");

                x.BeforeConsumingMessage(() => { _before.Set(); });
                x.AfterConsumingMessage(() => { _after.Set(); });
            });

Взгляните на файл MessageInterceptor_Specs.cs в проекте MassTransit.Tests для рабочего модульного теста.

person Chris Patterson    schedule 06.04.2011

Я прошел через то же самое испытание, и вот как я это сделал. У нас были IUnitOfWork и ITransaction: с помощью Commit() и Rollback() мы добавили класс для TransactionalOperation: ITransaction, чтобы добавить поддержку транзакционного поведения для вещей, не связанных с БД. IUnitOfWork.Commit() перебирает список TransactionalOperations, которые могли быть к нему добавлены.

Теперь, чтобы привязать шину к нашей системе: Добавлен IBus для обертывания внешней шины Реализован MassTransitBusGateway: IBus, затем для привязки шины к единице работы: Реализован UnitOfWorkBus: IBus (декоратор) — этот декоратор делает любые вызовы Publish() знать о единице работы и добавить ее как TransactionalOperation, чтобы ее выполнение было отложено до тех пор, пока UnitOfWork.Commit()

Таким образом, мы абстрагируем конкретную шину, тем самым избегая добавления зависимостей MassTransit от нескольких проектов (это действительно нужно только клиентам) и добавляем транзакционное поведение к его операциям.

person Fabricio    schedule 23.12.2013