Стремясь создать надежные системы, мы часто сталкиваемся с необходимостью реализации длительных процессов, включающих сложные файловые операции и расширенные транзакции. Хотя моя предыдущая статья была посвящена управлению такими процессами с помощью фоновых задач FastAPI и асинхронных очередей, важно отметить, что для задач, требующих высокой отказоустойчивости, этих методов может быть недостаточно. Чтобы построить отказоустойчивую и слабосвязанную систему, следует рассмотреть возможность использования технологий источников событий, и в этом посте мы рассмотрим реализацию с использованием RabbitMQ.
Использование RabbitMQ
RabbitMQ является известным программным обеспечением-посредником сообщений с открытым исходным кодом, служащим каналом связи между отдельными компонентами в распределенной системе. Он опирается на расширенный протокол очереди сообщений (AMQP) для обеспечения эффективного и надежного обмена сообщениями. Универсальность, масштабируемость и поддержка множества языков программирования RabbitMQ делают его привлекательным выбором для создания адаптивных и отказоустойчивых распределенных приложений.
Теперь давайте углубимся в реализацию кода.
Начнем с того, что RabbitMQ можно легко настроить, при этом Docker является распространенным и простым выбором. Чтобы установить RabbitMQ с помощью Docker, выполните следующую команду:
docker run -d --hostname my-rabbit --name some-rabbit -p 8080:15672 -p 5672:5672 rabbitmq:3-management
Проверить, работает ли контейнер, можно с помощью этой команды:
docker ps --filter "name=some-rabbit"
Ваш результат должен выглядеть следующим образом:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 972eff8fb287 rabbitmq:3-management "docker-entrypoint.s…" 3 hours ago Up 3 hours 4369/tcp, 5671/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp, 0.0.0.0:8080->15672/tcp some-rabbit
Настройка кода FastAPI
Для подключения к RabbitMQ нам потребуется клиентская библиотека. Отличным выбором является propan, многофункциональная клиентская библиотека AMQP, которая одновременно является декларативной и разработана с учетом подсказок типов. Посетите их github.
Чтобы установить пропан, запустите:
pip install "propan[async-rabbit]"
Теперь давайте установим соединение с контейнером RabbitMQ:
broker = RabbitBroker("amqp://guest:guest@localhost:5672/")
Далее мы создадим обработчик событий для получения сообщений из очереди:
@broker.handle(queue="fastapi") async def base_handler(body): print(f"This is the queue result :: {body}")
Чтобы гарантировать, что соединение брокера соответствует жизненному циклу приложения FastAPI, мы привяжем его к событиям запуска и завершения работы:
app = FastAPI() @app.on_event("shutdown") async def shutdown(): await broker.close() @app.on_event("startup") async def on_start_up(): # Starts the broker await broker.start()
Наконец, давайте создадим простой маршрут производителя для тестирования. В производственной системе рекомендуется иметь отдельные API или клиенты для производителей и потребителей, создавая слабосвязанную архитектуру, которая легко восстанавливается после сбоев:
@app.post("/external-queue") async def rabbitmq_queue(): print("Queueing a job Using Rabbit MQ") await broker.publish( queue='fastapi', message=str(random.randint(1, 100)) ) return {"result": "success"}
Объединим все воедино
import random from fastapi import FastAPI from propan import RabbitBroker broker = RabbitBroker("amqp://guest:guest@localhost:5672/") app = FastAPI() @broker.handle(queue="fastapi") async def base_handler(body): print(f"This is the queue result :: {body}") @app.on_event("shutdown") async def shutdown(): await broker.close() @app.on_event("startup") async def on_start_up(): # Starts the broker await broker.start() @app.post("/external-queue") async def rabbitmq_queue(): print("Queueing a job Using Rabbit MQ") await broker.publish( queue='fastapi', message=str(random.randint(1, 100)) ) return {"result": "success"}
Теперь вы можете отправить несколько запросов post
к конечной точке /external-queue
, чтобы поставить в очередь долго выполняющиеся процессы. Ваши журналы должны выглядеть следующим образом:
INFO: Uvicorn running on http://127.0.0.1:8000 (Press CTRL+C to quit) INFO: Started reloader process [7672] using WatchFiles INFO: Started server process [7676] INFO: Waiting for application startup. 2023-09-06 00:16:42,197 INFO - default | fastapi | - `base_handler` waiting for messages INFO: Application startup complete. Queueing a job Using Rabbit MQ INFO: 127.0.0.1:54138 - "POST /external-queue HTTP/1.1" 200 OK 2023-09-06 00:16:44,975 INFO - default | fastapi | 3b2296cc8a - Received This is the queue result :: 47 2023-09-06 00:16:44,979 INFO - default | fastapi | 3b2296cc8a - Processed Queueing a job Using Rabbit MQ INFO: 127.0.0.1:54138 - "POST /external-queue HTTP/1.1" 200 OK 2023-09-06 00:16:45,227 INFO - default | fastapi | 0a9c20746a - Received This is the queue result :: 98
Включив RabbitMQ в свою архитектуру, вы снабжаете свою систему надежной и отказоустойчивой основой обмена сообщениями, повышая ее отказоустойчивость и скорость реагирования.
Полный код можно найти на моем GitHub.
Спасибо, что дочитали до конца. Пожалуйста, подумайте о том, чтобы подписаться на автора и эту публикацию. Посетите Stackademic, чтобы узнать больше о том, как мы демократизируем бесплатное образование в области программирования во всем мире.