Что такое МассТранзит?

MassTransit — это платформа распределенных приложений с открытым исходным кодом для создания систем на основе сообщений в экосистеме .NET. Он обеспечивает абстракцию высокого уровня для отправки и получения сообщений между компонентами, позволяя разработчикам создавать несвязанные, масштабируемые и удобные в сопровождении приложения.

MassTransit упрощает связь между различными компонентами приложения с помощью инфраструктуры обмена сообщениями, обычно основанной на очередях сообщений. Он поддерживает различные технологии передачи сообщений, в том числе популярные, такие как RabbitMQ, ActiveMQ, Azure Service Bus и Amazon Simple Queue Service (SQS).

Используя шаблон обмена сообщениями, MassTransit обеспечивает слабую связь между различными частями системы. Он способствует использованию сообщений в качестве основного средства связи, а не прямого вызова методов или общих баз данных. Такой подход повышает масштабируемость, отказоустойчивость и ремонтопригодность.

Ключевые особенности MassTransit включают в себя:

  1. Связь на основе сообщений: MassTransit поощряет использование сообщений в качестве основной единицы связи между компонентами. Сообщения строго типизированы и определяют обмен данными между различными частями системы.
  2. Модель публикации/подписки: MassTransit поддерживает шаблон публикации/подписки, позволяющий компонентам публиковать сообщения в определенной теме или событии. Компоненты с подпиской получают соответствующие сообщения, не требуя прямого знания издателей.
  3. Модель запроса/ответа: MassTransit поддерживает шаблон запроса/ответа, когда один компонент отправляет сообщение запроса, а другой компонент отвечает соответствующим ответным сообщением. Этот шаблон обеспечивает синхронную связь между компонентами.
  4. Поддержка саги/конечного автомата: MassTransit предоставляет структуру саги/конечного автомата, которая помогает управлять длительными многоэтапными процессами в распределенных системах. Sagas позволяет координировать действия нескольких служб, сохраняя при этом согласованность и надежность данных.
  5. Отказоустойчивость и повторные попытки: MassTransit включает встроенные механизмы для обработки сбоев сообщений и выполнения повторных попыток. Он предоставляет настраиваемые политики повторных попыток, очереди недоставленных сообщений и стратегии обработки ошибок для обеспечения надежной обработки сообщений.
  6. Расширяемость и подключаемость: MassTransit обладает высокой расширяемостью и позволяет разработчикам интегрировать настраиваемые транспорты, сериализаторы и промежуточное ПО в конвейер обмена сообщениями. Такая гибкость обеспечивает возможность настройки и интеграции с различными технологиями.

Шаг 1:

Создайте новый проект веб-API ASP.NET Core в Visual Studio или предпочитаемой среде разработки.

Шаг 2:

Установите необходимые пакеты NuGet. В консоли диспетчера пакетов NuGet выполните следующие команды:

Install-Package Microsoft.AspNetCore.Mvc -Version 6.0.0
Install-Package MassTransit -Version 7.1.12
Install-Package MassTransit.AspNetCore -Version 7.1.12
Install-Package MassTransit.RabbitMQ -Version 7.1.12

Шаг 3:

Создайте в проекте новую папку под названием «Сообщения». Внутри папки «Сообщения» создайте класс «ProductCreatedMessage.cs» со следующим содержимым:

public class ProductCreatedMessage
{
    public int ProductId { get; set; }
    public string Name { get; set; }
    public decimal Price { get; set; }
}

Шаг 4:

Откройте файл Startup.cs и добавьте необходимый код для настройки DI и MassTransit. Измените метод ConfigureServices следующим образом:

using MassTransit;

public void ConfigureServices(IServiceCollection services)
{
    services.AddControllers();

    services.AddMassTransit(x =>
    {
        x.UsingRabbitMq((context, cfg) =>
        {
            cfg.Host("rabbitmq://localhost"); // Replace with your RabbitMQ host

            cfg.ConfigureEndpoints(context);
        });
    });

    services.AddMassTransitHostedService();
}

Шаг 5:

Создайте новую папку под названием «Контроллеры» в проекте. Внутри папки «Контроллеры» создайте класс «ProductController.cs» со следующим содержимым:

using System;
using System.Threading.Tasks;
using MassTransit;
using Microsoft.AspNetCore.Mvc;

[ApiController]
[Route("[controller]")]
public class ProductController : ControllerBase
{
    private readonly IPublishEndpoint _publishEndpoint;

    public ProductController(IPublishEndpoint publishEndpoint)
    {
        _publishEndpoint = publishEndpoint;
    }

    [HttpPost]
    public async Task<IActionResult> Create(ProductCreateRequest request)
    {
        // Save the product to the database or perform any necessary actions

        // Publish a message indicating that a new product was created
        await _publishEndpoint.Publish(new ProductCreatedMessage
        {
            ProductId = 1, // Replace with the actual product ID
            Name = request.Name,
            Price = request.Price
        });

        return Ok();
    }
}
public class ProductCreateRequest
{
    public string Name { get; set; }
    public decimal Price { get; set; }
}

Шаг 6:

Измените файл HomeController.cs, чтобы удалить код шаблона API по умолчанию.

Шаг 7:

Соберите и запустите приложение. Вы можете использовать такой инструмент, как Postman, для отправки запроса POST на http://localhost:<port>/product со следующей полезной нагрузкой JSON:

{
    "name": "Example Product",
    "price": 9.99
}

Шаг 8:

Чтобы использовать опубликованные сообщения, вы можете создать потребителя. Создайте в проекте новую папку под названием «Потребители». Внутри папки «Потребители» создайте класс «ProductCreatedConsumer.cs» со следующим содержимым:

using System.Threading.Tasks;
using MassTransit;
using Microsoft.Extensions.Logging;

public class ProductCreatedConsumer : IConsumer<ProductCreatedMessage>
{
    private readonly ILogger<ProductCreatedConsumer> _logger;

    public ProductCreatedConsumer(ILogger<ProductCreatedConsumer> logger)
    {
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<ProductCreatedMessage> context)
    {
        // Perform the necessary actions upon receiving the message
        _logger.LogInformation($"Product created - ID: {context.Message.ProductId}, Name: {context.Message.Name}, Price: {context.Message.Price}");

        await Task.CompletedTask;
    }
}

Шаг 9:

Зарегистрируйте потребителя в методе ConfigureServices файла Startup.cs:

using MassTransit;
using Microsoft.Extensions.DependencyInjection;

public void ConfigureServices(IServiceCollection services)
{
    // ...

    services.AddMassTransit(x =>
    {
        // ...

        x.AddConsumer<ProductCreatedConsumer>();
    });

    services.AddScoped<ProductCreatedConsumer>();

    // ...
}

Шаг 10:

Измените метод ConfigureServices, чтобы включить конфигурацию конечной точки потребителя:

using MassTransit;

public void ConfigureServices(IServiceCollection services)
{
    // ...

    services.AddMassTransit(x =>
    {
        x.UsingRabbitMq((context, cfg) =>
        {
            // ...

            cfg.ConfigureEndpoints(context);
        });
    });

    services.AddMassTransitHostedService();

    // ...
}

Определите типы сообщений, которыми будут обмениваться между издателем и подписчиками. Например, давайте создадим простое сообщение о событии с именем OrderCompletedEvent:

public class OrderCompletedEvent
{
    public int OrderId { get; set; }
    public string CustomerName { get; set; }
}

Настройте MassTransit и настройте конечные точки сообщений. В файле Program.cs добавьте следующий код:

using MassTransit;

var bus = Bus.Factory.CreateUsingInMemory(configure =>
{
    configure.ReceiveEndpoint("order_completed_queue", endpoint =>
    {
        endpoint.Handler<OrderCompletedEvent>(context =>
        {
            var orderCompletedEvent = context.Message;
            Console.WriteLine($"Order completed: OrderId={orderCompletedEvent.OrderId}, CustomerName={orderCompletedEvent.CustomerName}");
            return Task.CompletedTask;
        });
    });
});

await bus.StartAsync();

Console.WriteLine("Publishing events. Press any key to exit.");
Console.ReadKey();

await bus.StopAsync();

В приведенном выше коде мы создаем шину в памяти, используя CreateUsingInMemory, и настраиваем конечную точку приема с именем «order_completed_queue», которая обрабатывает сообщения типа OrderCompletedEvent. Внутри обработчика мы просто выводим полученное событие в консоль.

var orderCompletedEvent = new OrderCompletedEvent
{
    OrderId = 1,
    CustomerName = "John Doe"
};

await bus.Publish(orderCompletedEvent);

Обработка ошибок. Чтобы реализовать обработку ошибок в MassTransit, вы можете настроить политики повторных попыток и очереди ошибок. Вот пример настройки политики повторных попыток с экспоненциальной отсрочкой и перемещением неудачных сообщений в очередь ошибок:

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        // ...

        cfg.UseMessageRetry(r =>
        {
            r.Exponential(5, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(2));
        });

        cfg.UseInMemoryOutbox(); // Optionally use an in-memory outbox to prevent duplicate message processing

        cfg.ConfigureEndpoints(context);

        cfg.UseScheduledRedelivery(r =>
        {
            r.Intervals(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(30));
        });
    });
});

Сериализация сообщений. Чтобы настроить формат сериализации сообщений, вы можете использовать различные сериализаторы. Вот пример настройки MassTransit для использования сериализации Protobuf:

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        // ...

        cfg.UseMessageDataSerialization();

        cfg.UseJsonSerializer();

        // or

        cfg.UseProtoBufSerializer();
        
        // ...
    });
});

Управление версиями сообщений. Чтобы управлять версиями сообщений, вы можете включить информацию о версиях в контракт сообщения и обрабатывать разные версии в потребителе. Вот пример:

// ProductCreatedMessage Version 1
public class ProductCreatedMessageV1
{
    public int ProductId { get; set; }
    public string Name { get; set; }
    public decimal Price { get; set; }
}

// ProductCreatedMessage Version 2
public class ProductCreatedMessageV2
{
    public int ProductId { get; set; }
    public string Name { get; set; }
    public decimal Price { get; set; }
    public DateTime CreatedAt { get; set; }
}

// Consumer
public class ProductCreatedConsumer : IConsumer<ProductCreatedMessageV1>, IConsumer<ProductCreatedMessageV2>
{
    public Task Consume(ConsumeContext<ProductCreatedMessageV1> context)
    {
        // Handle Version 1 message
        return Task.CompletedTask;
    }

    public Task Consume(ConsumeContext<ProductCreatedMessageV2> context)
    {
        // Handle Version 2 message
        return Task.CompletedTask;
    }
}

Проверка сообщений. Для проверки входящих сообщений можно использовать библиотеку проверки, например FluentValidation. Вот пример использования FluentValidation для проверки сообщений:

// ProductCreatedMessage
public class ProductCreatedMessage
{
    public int ProductId { get; set; }
    public string Name { get; set; }
    public decimal Price { get; set; }
}

// ProductCreatedMessageValidator using FluentValidation
public class ProductCreatedMessageValidator : AbstractValidator<ProductCreatedMessage>
{
    public ProductCreatedMessageValidator()
    {
        RuleFor(x => x.ProductId).NotEmpty();
        RuleFor(x => x.Name).NotEmpty().Length(1, 100);
        RuleFor(x => x.Price).GreaterThan(0);
    }
}

// Consumer with message validation
public class ProductCreatedConsumer : IConsumer<ProductCreatedMessage>
{
    public async Task Consume(ConsumeContext<ProductCreatedMessage> context)
    {
        var validationResult = await new ProductCreatedMessageValidator().ValidateAsync(context.Message);
        if (!validationResult.IsValid)
        {
            // Handle validation errors
            return;
        }

        // Process the message
    }
}

Ведение журнала и мониторинг. Чтобы расширить возможности ведения журнала и мониторинга в вашем приложении, вы можете интегрировать платформу ведения журнала, например Serilog. Вот пример настройки Serilog для регистрации сообщений MassTransit:

using Serilog;
using Serilog.Events;
using MassTransit;
using MassTransit.SerilogIntegration;

// In the ConfigureServices method of Startup.cs
services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        // ...

        cfg.ConfigureEndpoints(context);

        cfg.UseSerilog();

        // ...
    });
});

Убедитесь, что вы правильно настроили Serilog в своем приложении и настроили его для записи событий журнала в нужный приемник журналов.

Безопасность. Чтобы реализовать меры безопасности для вашей инфраструктуры обмена сообщениями, вы можете включить безопасность и аутентификацию на транспортном уровне. Вот пример настройки шифрования TLS и установки имени пользователя/пароля для RabbitMQ:

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("rabbitmq://localhost", host =>
        {
            host.Username("guest");
            host.Password("guest");
            host.UseSsl(ssl =>
            {
                ssl.Protocol = System.Security.Authentication.SslProtocols.Tls12;
            });
        });

        cfg.ConfigureEndpoints(context);

        // ...
    });
});

Обязательно замените имя пользователя и пароль своими фактическими учетными данными RabbitMQ.

Масштабируемость и производительность. Чтобы оптимизировать масштабируемость и производительность, вы можете выбрать другую транспортную технологию в зависимости от ваших требований. Вот пример использования служебной шины Azure в качестве транспорта:

using MassTransit.Azure.ServiceBus.Core;

services.AddMassTransit(x =>
{
    x.UsingAzureServiceBus((context, cfg) =>
    {
        cfg.Host("your_connection_string");

        cfg.ConfigureEndpoints(context);

        // ...
    });
});

Замените your_connection_string фактической строкой подключения для вашего экземпляра служебной шины Azure.

Модульное и интеграционное тестирование. Для модульного и интеграционного тестирования можно использовать такие платформы, как xUnit или NUnit. Вот пример написания модульного теста для потребителя с использованием xUnit:

using Xunit;
using Moq;
using MassTransit;
using Microsoft.Extensions.Logging;

public class ProductCreatedConsumerTests
{
    [Fact]
    public async Task Consume_ProductCreatedMessage_LoggerCalled()
    {
        // Arrange
        var loggerMock = new Mock<ILogger<ProductCreatedConsumer>>();
        var consumer = new ProductCreatedConsumer(loggerMock.Object);
        var contextMock = new Mock<ConsumeContext<ProductCreatedMessage>>();
        var message = new ProductCreatedMessage { ProductId = 1, Name = "Test", Price = 9.99m };
        contextMock.Setup(x => x.Message).Returns(message);

        // Act
        await consumer.Consume(contextMock.Object);

        // Assert
        loggerMock.Verify(
            x => x.LogInformation(
                $"Product created - ID: {message.ProductId}, Name: {message.Name}, Price: {message.Price}"
            ),
            Times.Once
        );
    }
}

В этом примере мы используем библиотеку Moq для создания фиктивного регистратора и фиктивного контекста потребления, что позволяет нам тестировать поведение потребителя.

Не забудьте скорректировать и адаптировать эти примеры кода в соответствии с вашими конкретными потребностями и средами тестирования.