Стремясь создать надежные системы, мы часто сталкиваемся с необходимостью реализации длительных процессов, включающих сложные файловые операции и расширенные транзакции. Хотя моя предыдущая статья была посвящена управлению такими процессами с помощью фоновых задач FastAPI и асинхронных очередей, важно отметить, что для задач, требующих высокой отказоустойчивости, этих методов может быть недостаточно. Чтобы построить отказоустойчивую и слабосвязанную систему, следует рассмотреть возможность использования технологий источников событий, и в этом посте мы рассмотрим реализацию с использованием RabbitMQ.

Использование RabbitMQ

RabbitMQ является известным программным обеспечением-посредником сообщений с открытым исходным кодом, служащим каналом связи между отдельными компонентами в распределенной системе. Он опирается на расширенный протокол очереди сообщений (AMQP) для обеспечения эффективного и надежного обмена сообщениями. Универсальность, масштабируемость и поддержка множества языков программирования RabbitMQ делают его привлекательным выбором для создания адаптивных и отказоустойчивых распределенных приложений.

Теперь давайте углубимся в реализацию кода.

Начнем с того, что RabbitMQ можно легко настроить, при этом Docker является распространенным и простым выбором. Чтобы установить RabbitMQ с помощью Docker, выполните следующую команду:

docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management

Проверить, работает ли контейнер, можно с помощью этой команды:

docker ps --filter "name=some-rabbit"

Ваш результат должен выглядеть следующим образом:

CONTAINER ID   IMAGE                   COMMAND                  CREATED       STATUS       PORTS                                                                                                        NAMES
972eff8fb287   rabbitmq:3-management   "docker-entrypoint.s…"   3 hours ago   Up 3 hours   4369/tcp, 5671/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp, 0.0.0.0:8080->15672/tcp   some-rabbit

Настройка кода FastAPI

Для подключения к RabbitMQ нам потребуется клиентская библиотека. Отличным выбором является propan, многофункциональная клиентская библиотека AMQP, которая одновременно является декларативной и разработана с учетом подсказок типов. Посетите их github.

Чтобы установить пропан, запустите:

pip install "propan[async-rabbit]"

Теперь давайте установим соединение с контейнером RabbitMQ:

broker = RabbitBroker("amqp://guest:guest@localhost:5672/")

Далее мы создадим обработчик событий для получения сообщений из очереди:

@broker.handle(queue="fastapi")
async def base_handler(body):
    print(f"This is the queue result :: {body}")

Чтобы гарантировать, что соединение брокера соответствует жизненному циклу приложения FastAPI, мы привяжем его к событиям запуска и завершения работы:

app = FastAPI()

@app.on_event("shutdown")
async def shutdown():
    await broker.close()


@app.on_event("startup")
async def on_start_up():
     # Starts the broker 
     await broker.start()

Наконец, давайте создадим простой маршрут производителя для тестирования. В производственной системе рекомендуется иметь отдельные API или клиенты для производителей и потребителей, создавая слабосвязанную архитектуру, которая легко восстанавливается после сбоев:

@app.post("/external-queue")
async def rabbitmq_queue():
    print("Queueing a job Using Rabbit MQ")
    await broker.publish(
                      queue='fastapi',
                      message=str(random.randint(1, 100))
    )
    
    return {"result": "success"}

Объединим все воедино

import random

from fastapi import FastAPI
from propan import RabbitBroker


broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
app = FastAPI()

@broker.handle(queue="fastapi")
async def base_handler(body):
    print(f"This is the queue result :: {body}")

@app.on_event("shutdown")
async def shutdown():
    await broker.close()


@app.on_event("startup")
async def on_start_up():
     # Starts the broker 
     await broker.start()

@app.post("/external-queue")
async def rabbitmq_queue():
    print("Queueing a job Using Rabbit MQ")
    await broker.publish(
                      queue='fastapi',
                      message=str(random.randint(1, 100))
    )
    
    return {"result": "success"}

Теперь вы можете отправить несколько запросов post к конечной точке /external-queue, чтобы поставить в очередь долго выполняющиеся процессы. Ваши журналы должны выглядеть следующим образом:

INFO:     Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit)
INFO:     Started reloader process [7672] using WatchFiles
INFO:     Started server process [7676]
INFO:     Waiting for application startup.
2023-09-06 00:16:42,197 INFO     - default | fastapi |            - `base_handler` waiting for messages
INFO:     Application startup complete.
Queueing a job Using Rabbit MQ
INFO:     127.0.0.1:54138 - "POST /external-queue HTTP/1.1" 200 OK
2023-09-06 00:16:44,975 INFO     - default | fastapi | 3b2296cc8a - Received
This is the queue result :: 47
2023-09-06 00:16:44,979 INFO     - default | fastapi | 3b2296cc8a - Processed
Queueing a job Using Rabbit MQ
INFO:     127.0.0.1:54138 - "POST /external-queue HTTP/1.1" 200 OK
2023-09-06 00:16:45,227 INFO     - default | fastapi | 0a9c20746a - Received
This is the queue result :: 98

Включив RabbitMQ в свою архитектуру, вы снабжаете свою систему надежной и отказоустойчивой основой обмена сообщениями, повышая ее отказоустойчивость и скорость реагирования.

Полный код можно найти на моем GitHub.

Спасибо, что дочитали до конца. Пожалуйста, подумайте о том, чтобы подписаться на автора и эту публикацию. Посетите Stackademic, чтобы узнать больше о том, как мы демократизируем бесплатное образование в области программирования во всем мире.