Слушайте ZeroMQ в процессе приложения aiohttp

Я запускаю приложение aiohttp с Gunicorn позади nginx. В модуле инициализации моего приложения я не запускаю приложение с помощью web.run_app(app), а просто создаю экземпляр, который будет импортирован Gunicorn для запуска в каждом рабочем процессе, который создает Gunicorn. Итак, Gunicorn создает несколько рабочих процессов, циклы событий внутри них, а затем запускает обработчик запросов приложения в этих циклах.

В моем приложении aiohttp есть набор подключенных WebSockets (клиентов мобильных приложений), которые я хочу уведомить о событии, произошедшем в любом из процессов приложения, запущенных Gunicorn. И я хочу уведомить все WebSockets, которые подключены ко всем процессам приложений. Поэтому я создаю какой-то восходящий прокси-сервер, используя ZeroMQ, и я хочу подписаться на него, используя сокет zmq.SUB из каждого процесса приложения.

... Итак, в основном я хочу добиться чего-то подобного в каждом работнике приложения:

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5555')

while True:
    event = socket.recv()
    for ws in app['websockets']:
        ws.send_bytes(event)
    # break before app shutdown. How?

Как я могу прослушивать прокси-сервер ZeroMQ в приложении aiohttp для пересылки сообщений на WebSockets?

Где я могу поместить этот код для запуска в фоновом режиме в цикле событий и как правильно запустить и завершить его работу в течение aiohttp жизненного цикла приложения?


ОБНОВЛЕНИЕ

Я уже создал проблему в репозитории aiohttp на GitHub с описанием проблемы и предложением возможного решения. . Я был бы очень признателен за вклад здесь или там по описанной проблеме.


person Eugene Naydenov    schedule 12.08.2016    source источник
comment
Я пытаюсь сделать то же самое (объединить ZMQ и aiohttp), но как ваш ответ относится к вопросу? это не о zmq, не так ли?   -  person Alex    schedule 15.06.2019
comment
@Алекс Точно. Вопрос заключался в том, как прослушивать ZeroMQ (или любую другую очередь) в фоновом режиме из того же процесса aiohttp. В то время у aiohttp не было обработчика сигнала on_startup. Но теперь это так, поэтому нет проблем с прослушиванием любого количества очередей в фоновом режиме.   -  person Eugene Naydenov    schedule 15.06.2019


Ответы (1)


Хорошо, вопрос и обсуждение этой проблемы привели к новой функциональности, которую я внес свой вклад в aiohttp, а именно в версии 1.0 у нас появится возможность регистрировать on_startup сигналы приложений с помощью Application.on_startup() метода.

Документация.
Рабочий пример на основной ветке.

#!/usr/bin/env python3
"""Example of aiohttp.web.Application.on_startup signal handler"""
import asyncio

import aioredis
from aiohttp.web import Application, WebSocketResponse, run_app

async def websocket_handler(request):
    ws = WebSocketResponse()
    await ws.prepare(request)
    request.app['websockets'].append(ws)
    try:
        async for msg in ws:
            print(msg)
            await asyncio.sleep(1)
    finally:
        request.app['websockets'].remove(ws)
    return ws


async def on_shutdown(app):
    for ws in app['websockets']:
        await ws.close(code=999, message='Server shutdown')


async def listen_to_redis(app):
    try:
        sub = await aioredis.create_redis(('localhost', 6379), loop=app.loop)
        ch, *_ = await sub.subscribe('news')
        async for msg in ch.iter(encoding='utf-8'):
            # Forward message to all connected websockets:
            for ws in app['websockets']:
                ws.send_str('{}: {}'.format(ch.name, msg))
            print("message in {}: {}".format(ch.name, msg))
    except asyncio.CancelledError:
        pass
    finally:
        print('Cancel Redis listener: close connection...')
        await sub.unsubscribe(ch.name)
        await sub.quit()
        print('Redis connection closed.')


async def start_background_tasks(app):
    app['redis_listener'] = app.loop.create_task(listen_to_redis(app))


async def cleanup_background_tasks(app):
    print('cleanup background tasks...')
    app['redis_listener'].cancel()
    await app['redis_listener']


async def init(loop):
    app = Application(loop=loop)
    app['websockets'] = []
    app.router.add_get('/news', websocket_handler)
    app.on_startup.append(start_background_tasks)
    app.on_cleanup.append(cleanup_background_tasks)
    app.on_shutdown.append(on_shutdown)
    return app

loop = asyncio.get_event_loop()
app = loop.run_until_complete(init(loop))
run_app(app)
person Eugene Naydenov    schedule 06.09.2016