Недавно я работал над домашним проектом и решил протестировать фреймворк, о котором много слышал, но еще не тестировал сам - FastAPI. Сначала это был небольшой проект с несколькими конечными точками и интерфейсом React Frontend, но потом я почувствовал, что для него было правильным получить настоящую классную архитектуру. Итак, пришло время разбить его на микросервисы.

И, конечно же, одним из основных вопросов, которые нужно было решить, был ответ на вопрос: как я хочу, чтобы они общались? По разным причинам я выбрал RabbitMQ в качестве брокера событий и сообщений. Поскольку мои микросервисы были довольно активными, они были нужны мне как для потребления, так и для создания сообщений. И вот тут пришла хитрая часть. Я просмотрел документацию в FastAPI и RabbitMQ, приложил все усилия, чтобы реализовать RPC, как это было указано в документации, но он все еще не работал. Наконец, мне удалось заставить его работать в обоих направлениях, для публикации и использования в одном приложении FastAPI. Поскольку мне не удалось найти в Интернете полностью работающее решение, я решил, что стоит поделиться с вами своим опытом. Я пропущу некоторые части, такие как установка python, venv и RabbitMQ, настройка логгеров, чтобы они были краткими и по существу.

Прежде всего необходимо настроить клиент Pika, который будет обрабатывать всю связь с RabbitMQ:

pika_client.py

class PikaClient:

    def __init__(self, process_callable):
        self.publish_queue_name = env('PUBLISH_QUEUE', 'foo_publish_queue')
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=env('RABBIT_HOST', '127.0.0.1'))
        )
        self.channel = self.connection.channel()
        self.publish_queue = self.channel.queue_declare(queue=self.publish_queue_name)
        self.callback_queue = self.publish_queue.method.queue
        self.response = None
        self.process_callable = process_callable
        logger.info('Pika connection initialized')

Наиболее важными здесь являются:

  • process_callable - вызываемый обратный вызов, который будет обрабатывать фактическую бизнес-логику для обработки входящего сообщения;
  • PUBLISH_QUEUE - имя очереди, куда мы будем отправлять наши исходящие сообщения.

Затем нам нужно добавить метод для настройки потребителя для входящих сообщений. Вот как это будет выглядеть:

pika_client.py (продолжение)

async def consume(self, loop):
    """Setup message listener with the current running loop"""
    connection = await connect_robust(host=env('RABBIT_HOST', '127.0.0.1'),
                                      port=5672,
                                      loop=loop)
    channel = await connection.channel()
    queue = await channel.declare_queue(env('CONSUME_QUEUE', 'foo_consume_queue'))
    await queue.consume(self.process_incoming_message, no_ack=False)
    logger.info('Established pika async listener')
    return connection

На что стоит обратить внимание:

  • CONSUME_QUEUE - имя очереди, из которой мы будем получать входящие сообщения;
  • Я решил настроить потребителя с подтверждением получения.

Затем мы добавляем еще один важный метод обработки входящего сообщения:

pika_client.py (продолжение)

async def process_incoming_message(self, message):
    """Processing incoming message from RabbitMQ"""
    message.ack()
    body = message.body
    logger.info('Received message')
    if body:
        self.process_callable(json.loads(body))

Собственно, ничего особенного здесь нет.

И, конечно же, должен быть способ отправки. Я решил использовать здесь простое соединение pika, чтобы показать, что использование синхронного клиента также возможно. Не стесняйтесь повторно использовать подключение из метода потребления.

pika_client.py (продолжение)

def send_message(self, message: dict):
    """Method to publish message to RabbitMQ"""
    self.channel.basic_publish(
        exchange='',
        routing_key=self.publish_queue_name,
        properties=pika.BasicProperties(
            reply_to=self.callback_queue,
            correlation_id=str(uuid.uuid4())
        ),
        body=json.dumps(message)
    )

Хорошо, теперь пора настроить наше приложение. Прежде всего, мы создаем класс для нашего приложения и добавляем метод, который регистрирует входящее сообщение в наш стандартный вывод. Обратите внимание, что мы инициализируем экземпляр класса PikaClient, созданный выше.

test_app.py

class FooApp(FastAPI):

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.pika_client = PikaClient(self.log_incoming_message)

    @classmethod
    def log_incoming_message(cls, message: dict):
        """Method to do something meaningful with the incoming message"""
        logger.info('Here we got incoming message %s', message)

А теперь самое важное: мы настраиваем приложение и создаем обработчик событий запуска, который на самом деле выполняет магию прослушивания сообщений под капотом. Получаем текущий рабочий цикл и создаем и ждем задачу потребления.

test_app.py (продолжение)

foo_app = FooApp()
foo_app.include_router(router)


@foo_app.on_event('startup')
async def startup():
    loop = asyncio.get_running_loop()
    task = loop.create_task(foo_app.pika_client.consume(loop))
    await task

Вот и все. Все, что у нас осталось, - это схема маршрутизатора и парсера полезной нагрузки для обработки вызова API, которую мы будем использовать для тестирования нашего приложения.

router.py

router = APIRouter(
    tags=['items'],
    responses={404: {"description": "Page not found"}}
)


@router.post('/send-message')
async def send_message(payload: MessageSchema, request: Request):
    request.app.pika_client.send_message(
        {"message": payload.message}
    )
    return {"status": "ok"}

schema.py

from pydantic import BaseModel


class MessageSchema(BaseModel):
    message: str

И файл request.http для тестирования нашего API. Я использую Pycharm, так что вот содержимое файла, если вы используете Postman, тоже ничего страшного.

POST http://127.0.0.1:8001/send-message
Accept: application/json

{
"message": "I test sending messages"
}

Все, что нам нужно сделать, это stat rabbitmq и наше приложение. Я использовал uvicorn, поэтому команда запуска выглядит следующим образом (настраивает клиент api на порт 8001 с функцией автоматической перезагрузки из папки src, на случай, если вы что-то измените на лету):

test_app: foo_app - reload - reload-dir .src / - порт 8001

После запуска клиента я вижу в своем стандартном выводе следующее:

Сначала мы видим сообщение с установленным соединением в вызове Pika init, а затем в событии запуска самого приложения.

Если мы назовем наш POST http://127.0.0.1:8001/send-message и взглянем на админку RabbitMQ, мы увидим:

Для нас созданы 2 новые очереди. Было опубликовано одно сообщение.

И если мы попытаемся получить сообщение, мы увидим фактическое сообщение {«Я тестирую отправку сообщений»}. Итак, наш вызов API сработал.

Теперь, если мы опубликуем другое сообщение в foo_consume_queue

Мы видим, что он был получен и зарегистрирован нашей функцией обратного вызова.

Теперь вы знаете, как настроить работающий клиент FastAPI с потребителем и слушателем RabbitMQ в одном приложении.