Что такое МассТранзит?
MassTransit — это платформа распределенных приложений с открытым исходным кодом для создания систем на основе сообщений в экосистеме .NET. Он обеспечивает абстракцию высокого уровня для отправки и получения сообщений между компонентами, позволяя разработчикам создавать несвязанные, масштабируемые и удобные в сопровождении приложения.
MassTransit упрощает связь между различными компонентами приложения с помощью инфраструктуры обмена сообщениями, обычно основанной на очередях сообщений. Он поддерживает различные технологии передачи сообщений, в том числе популярные, такие как RabbitMQ, ActiveMQ, Azure Service Bus и Amazon Simple Queue Service (SQS).
Используя шаблон обмена сообщениями, MassTransit обеспечивает слабую связь между различными частями системы. Он способствует использованию сообщений в качестве основного средства связи, а не прямого вызова методов или общих баз данных. Такой подход повышает масштабируемость, отказоустойчивость и ремонтопригодность.
Ключевые особенности MassTransit включают в себя:
- Связь на основе сообщений: MassTransit поощряет использование сообщений в качестве основной единицы связи между компонентами. Сообщения строго типизированы и определяют обмен данными между различными частями системы.
- Модель публикации/подписки: MassTransit поддерживает шаблон публикации/подписки, позволяющий компонентам публиковать сообщения в определенной теме или событии. Компоненты с подпиской получают соответствующие сообщения, не требуя прямого знания издателей.
- Модель запроса/ответа: MassTransit поддерживает шаблон запроса/ответа, когда один компонент отправляет сообщение запроса, а другой компонент отвечает соответствующим ответным сообщением. Этот шаблон обеспечивает синхронную связь между компонентами.
- Поддержка саги/конечного автомата: MassTransit предоставляет структуру саги/конечного автомата, которая помогает управлять длительными многоэтапными процессами в распределенных системах. Sagas позволяет координировать действия нескольких служб, сохраняя при этом согласованность и надежность данных.
- Отказоустойчивость и повторные попытки: MassTransit включает встроенные механизмы для обработки сбоев сообщений и выполнения повторных попыток. Он предоставляет настраиваемые политики повторных попыток, очереди недоставленных сообщений и стратегии обработки ошибок для обеспечения надежной обработки сообщений.
- Расширяемость и подключаемость: MassTransit обладает высокой расширяемостью и позволяет разработчикам интегрировать настраиваемые транспорты, сериализаторы и промежуточное ПО в конвейер обмена сообщениями. Такая гибкость обеспечивает возможность настройки и интеграции с различными технологиями.
Шаг 1:
Создайте новый проект веб-API ASP.NET Core в Visual Studio или предпочитаемой среде разработки.
Шаг 2:
Установите необходимые пакеты NuGet. В консоли диспетчера пакетов NuGet выполните следующие команды:
Install-Package Microsoft.AspNetCore.Mvc -Version 6.0.0 Install-Package MassTransit -Version 7.1.12 Install-Package MassTransit.AspNetCore -Version 7.1.12 Install-Package MassTransit.RabbitMQ -Version 7.1.12
Шаг 3:
Создайте в проекте новую папку под названием «Сообщения». Внутри папки «Сообщения» создайте класс «ProductCreatedMessage.cs» со следующим содержимым:
public class ProductCreatedMessage { public int ProductId { get; set; } public string Name { get; set; } public decimal Price { get; set; } }
Шаг 4:
Откройте файл Startup.cs и добавьте необходимый код для настройки DI и MassTransit. Измените метод ConfigureServices следующим образом:
using MassTransit; public void ConfigureServices(IServiceCollection services) { services.AddControllers(); services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { cfg.Host("rabbitmq://localhost"); // Replace with your RabbitMQ host cfg.ConfigureEndpoints(context); }); }); services.AddMassTransitHostedService(); }
Шаг 5:
Создайте новую папку под названием «Контроллеры» в проекте. Внутри папки «Контроллеры» создайте класс «ProductController.cs» со следующим содержимым:
using System; using System.Threading.Tasks; using MassTransit; using Microsoft.AspNetCore.Mvc; [ApiController] [Route("[controller]")] public class ProductController : ControllerBase { private readonly IPublishEndpoint _publishEndpoint; public ProductController(IPublishEndpoint publishEndpoint) { _publishEndpoint = publishEndpoint; } [HttpPost] public async Task<IActionResult> Create(ProductCreateRequest request) { // Save the product to the database or perform any necessary actions // Publish a message indicating that a new product was created await _publishEndpoint.Publish(new ProductCreatedMessage { ProductId = 1, // Replace with the actual product ID Name = request.Name, Price = request.Price }); return Ok(); } } public class ProductCreateRequest { public string Name { get; set; } public decimal Price { get; set; } }
Шаг 6:
Измените файл HomeController.cs, чтобы удалить код шаблона API по умолчанию.
Шаг 7:
Соберите и запустите приложение. Вы можете использовать такой инструмент, как Postman, для отправки запроса POST на http://localhost:<port>/product
со следующей полезной нагрузкой JSON:
{ "name": "Example Product", "price": 9.99 }
Шаг 8:
Чтобы использовать опубликованные сообщения, вы можете создать потребителя. Создайте в проекте новую папку под названием «Потребители». Внутри папки «Потребители» создайте класс «ProductCreatedConsumer.cs» со следующим содержимым:
using System.Threading.Tasks; using MassTransit; using Microsoft.Extensions.Logging; public class ProductCreatedConsumer : IConsumer<ProductCreatedMessage> { private readonly ILogger<ProductCreatedConsumer> _logger; public ProductCreatedConsumer(ILogger<ProductCreatedConsumer> logger) { _logger = logger; } public async Task Consume(ConsumeContext<ProductCreatedMessage> context) { // Perform the necessary actions upon receiving the message _logger.LogInformation($"Product created - ID: {context.Message.ProductId}, Name: {context.Message.Name}, Price: {context.Message.Price}"); await Task.CompletedTask; } }
Шаг 9:
Зарегистрируйте потребителя в методе ConfigureServices файла Startup.cs:
using MassTransit; using Microsoft.Extensions.DependencyInjection; public void ConfigureServices(IServiceCollection services) { // ... services.AddMassTransit(x => { // ... x.AddConsumer<ProductCreatedConsumer>(); }); services.AddScoped<ProductCreatedConsumer>(); // ... }
Шаг 10:
Измените метод ConfigureServices, чтобы включить конфигурацию конечной точки потребителя:
using MassTransit; public void ConfigureServices(IServiceCollection services) { // ... services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { // ... cfg.ConfigureEndpoints(context); }); }); services.AddMassTransitHostedService(); // ... }
Определите типы сообщений, которыми будут обмениваться между издателем и подписчиками. Например, давайте создадим простое сообщение о событии с именем OrderCompletedEvent
:
public class OrderCompletedEvent { public int OrderId { get; set; } public string CustomerName { get; set; } }
Настройте MassTransit и настройте конечные точки сообщений. В файле Program.cs
добавьте следующий код:
using MassTransit; var bus = Bus.Factory.CreateUsingInMemory(configure => { configure.ReceiveEndpoint("order_completed_queue", endpoint => { endpoint.Handler<OrderCompletedEvent>(context => { var orderCompletedEvent = context.Message; Console.WriteLine($"Order completed: OrderId={orderCompletedEvent.OrderId}, CustomerName={orderCompletedEvent.CustomerName}"); return Task.CompletedTask; }); }); }); await bus.StartAsync(); Console.WriteLine("Publishing events. Press any key to exit."); Console.ReadKey(); await bus.StopAsync();
В приведенном выше коде мы создаем шину в памяти, используя CreateUsingInMemory
, и настраиваем конечную точку приема с именем «order_completed_queue», которая обрабатывает сообщения типа OrderCompletedEvent
. Внутри обработчика мы просто выводим полученное событие в консоль.
var orderCompletedEvent = new OrderCompletedEvent { OrderId = 1, CustomerName = "John Doe" }; await bus.Publish(orderCompletedEvent);
Обработка ошибок. Чтобы реализовать обработку ошибок в MassTransit, вы можете настроить политики повторных попыток и очереди ошибок. Вот пример настройки политики повторных попыток с экспоненциальной отсрочкой и перемещением неудачных сообщений в очередь ошибок:
services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { // ... cfg.UseMessageRetry(r => { r.Exponential(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2)); }); cfg.UseInMemoryOutbox(); // Optionally use an in-memory outbox to prevent duplicate message processing cfg.ConfigureEndpoints(context); cfg.UseScheduledRedelivery(r => { r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30)); }); }); });
Сериализация сообщений. Чтобы настроить формат сериализации сообщений, вы можете использовать различные сериализаторы. Вот пример настройки MassTransit для использования сериализации Protobuf:
services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { // ... cfg.UseMessageDataSerialization(); cfg.UseJsonSerializer(); // or cfg.UseProtoBufSerializer(); // ... }); });
Управление версиями сообщений. Чтобы управлять версиями сообщений, вы можете включить информацию о версиях в контракт сообщения и обрабатывать разные версии в потребителе. Вот пример:
// ProductCreatedMessage Version 1 public class ProductCreatedMessageV1 { public int ProductId { get; set; } public string Name { get; set; } public decimal Price { get; set; } } // ProductCreatedMessage Version 2 public class ProductCreatedMessageV2 { public int ProductId { get; set; } public string Name { get; set; } public decimal Price { get; set; } public DateTime CreatedAt { get; set; } } // Consumer public class ProductCreatedConsumer : IConsumer<ProductCreatedMessageV1>, IConsumer<ProductCreatedMessageV2> { public Task Consume(ConsumeContext<ProductCreatedMessageV1> context) { // Handle Version 1 message return Task.CompletedTask; } public Task Consume(ConsumeContext<ProductCreatedMessageV2> context) { // Handle Version 2 message return Task.CompletedTask; } }
Проверка сообщений. Для проверки входящих сообщений можно использовать библиотеку проверки, например FluentValidation. Вот пример использования FluentValidation для проверки сообщений:
// ProductCreatedMessage public class ProductCreatedMessage { public int ProductId { get; set; } public string Name { get; set; } public decimal Price { get; set; } } // ProductCreatedMessageValidator using FluentValidation public class ProductCreatedMessageValidator : AbstractValidator<ProductCreatedMessage> { public ProductCreatedMessageValidator() { RuleFor(x => x.ProductId).NotEmpty(); RuleFor(x => x.Name).NotEmpty().Length(1, 100); RuleFor(x => x.Price).GreaterThan(0); } } // Consumer with message validation public class ProductCreatedConsumer : IConsumer<ProductCreatedMessage> { public async Task Consume(ConsumeContext<ProductCreatedMessage> context) { var validationResult = await new ProductCreatedMessageValidator().ValidateAsync(context.Message); if (!validationResult.IsValid) { // Handle validation errors return; } // Process the message } }
Ведение журнала и мониторинг. Чтобы расширить возможности ведения журнала и мониторинга в вашем приложении, вы можете интегрировать платформу ведения журнала, например Serilog. Вот пример настройки Serilog для регистрации сообщений MassTransit:
using Serilog; using Serilog.Events; using MassTransit; using MassTransit.SerilogIntegration; // In the ConfigureServices method of Startup.cs services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { // ... cfg.ConfigureEndpoints(context); cfg.UseSerilog(); // ... }); });
Убедитесь, что вы правильно настроили Serilog в своем приложении и настроили его для записи событий журнала в нужный приемник журналов.
Безопасность. Чтобы реализовать меры безопасности для вашей инфраструктуры обмена сообщениями, вы можете включить безопасность и аутентификацию на транспортном уровне. Вот пример настройки шифрования TLS и установки имени пользователя/пароля для RabbitMQ:
services.AddMassTransit(x => { x.UsingRabbitMq((context, cfg) => { cfg.Host("rabbitmq://localhost", host => { host.Username("guest"); host.Password("guest"); host.UseSsl(ssl => { ssl.Protocol = System.Security.Authentication.SslProtocols.Tls12; }); }); cfg.ConfigureEndpoints(context); // ... }); });
Обязательно замените имя пользователя и пароль своими фактическими учетными данными RabbitMQ.
Масштабируемость и производительность. Чтобы оптимизировать масштабируемость и производительность, вы можете выбрать другую транспортную технологию в зависимости от ваших требований. Вот пример использования служебной шины Azure в качестве транспорта:
using MassTransit.Azure.ServiceBus.Core; services.AddMassTransit(x => { x.UsingAzureServiceBus((context, cfg) => { cfg.Host("your_connection_string"); cfg.ConfigureEndpoints(context); // ... }); });
Замените your_connection_string фактической строкой подключения для вашего экземпляра служебной шины Azure.
Модульное и интеграционное тестирование. Для модульного и интеграционного тестирования можно использовать такие платформы, как xUnit или NUnit. Вот пример написания модульного теста для потребителя с использованием xUnit:
using Xunit; using Moq; using MassTransit; using Microsoft.Extensions.Logging; public class ProductCreatedConsumerTests { [Fact] public async Task Consume_ProductCreatedMessage_LoggerCalled() { // Arrange var loggerMock = new Mock<ILogger<ProductCreatedConsumer>>(); var consumer = new ProductCreatedConsumer(loggerMock.Object); var contextMock = new Mock<ConsumeContext<ProductCreatedMessage>>(); var message = new ProductCreatedMessage { ProductId = 1, Name = "Test", Price = 9.99m }; contextMock.Setup(x => x.Message).Returns(message); // Act await consumer.Consume(contextMock.Object); // Assert loggerMock.Verify( x => x.LogInformation( $"Product created - ID: {message.ProductId}, Name: {message.Name}, Price: {message.Price}" ), Times.Once ); } }
В этом примере мы используем библиотеку Moq для создания фиктивного регистратора и фиктивного контекста потребления, что позволяет нам тестировать поведение потребителя.
Не забудьте скорректировать и адаптировать эти примеры кода в соответствии с вашими конкретными потребностями и средами тестирования.