Выполнить действия в основном цикле Tornado после его запуска

Я создаю веб-сервер python3 tornado, который может прослушивать брокера MQTT и всякий раз, когда прослушивает новое сообщение от него, транслирует его в подключенные браузеры через веб-сокеты. Однако похоже, что Tornado не любит вызовы своего API из потока, отличного от IOLoop.current(), и я не могу найти другого решения...

Я уже пытался написать код. Я поместил весь клиент MQTT (в данном случае называемый клиентом PMCU) в отдельный поток, который зацикливается и прослушивает уведомления MQTT.

def on_pmcu_data(data):
    for websocket_client in websocket_clients:
        print("Sending websocket message")
        websocket_client.write_message(data)  # Here it stuck!
        print("Sent")

class WebSocketHandler(tornado.websocket.WebSocketHandler):
    def open(self):
        websocket_clients.append(self)

    def on_close(self):
        websocket_clients.remove(self)

def make_app():
    return tornado.web.Application([
        (r'/ws', WebSocketHandler)
    ])

if __name__ == "__main__":
    main_loop = IOLoop().current()

    pmcu_client = PMCUClient(on_pmcu_data)
    threading.Thread(target=lambda: pmcu_client.listen("5.4.3.2")).start()

    app = make_app()
    app.listen(8080)
    main_loop.start()

Однако, как я уже сказал, кажется, что вызовы Tornado API находятся вне блоков IOLoop.current(): приведенный выше код печатает только Sending websocket message.

Я намерен запустить websocket_client.write_message(data) в цикле событий IOLoop.current(). Но кажется, что функция IOLoop.current().spawn_callback(lambda: websocket_client.write_message(data)) не работает после запуска IOLoop.current(). Как я мог этого добиться?

Я знаю, что у меня огромное непонимание IOLoop, asyncio, от которого он зависит, и async python3.


person ruta    schedule 26.04.2019    source источник


Ответы (1)


on_pmcu_data вызывается в отдельном потоке, но веб-сокет контролируется циклом событий Tornado. Вы не можете писать в веб-сокет из потока, если у вас нет доступа к циклу событий.

Вам нужно будет попросить IOLoop записать данные в веб-сокеты.

Решение 1.

Для простых случаев, если вы не хотите сильно менять код, вы можете сделать это:

if __name__ == "__main__":
    main_loop = IOLoop().current()

    on_pmcu_data_callback = lambda data: main_loop.add_callback(on_pmcu_data, data)

    pmcu_client = PMCUClient(on_pmcu_data_callback)

    ...

Это должно решить вашу проблему.


Решение 2.

В более сложных случаях вы можете передать класс main_loop в класс PMCUClient, а затем использовать add_callback (или spawn_callback) для запуска on_pmcu_data.

Пример:

if __name__ == "__main__":
    main_loop = IOLoop().current()

    pmcu_client = PMCUClient(on_pmcu_data, main_loop) # also pass the main loop

    ...

Затем в PMCUCLient классе:

class PMCUClient:
    def __init__(self, on_pmcu_data, main_loop):
        ...
        self.main_loop = main_loop

    def lister(...):
        ...
        self.main_loop.add_callback(self.on_pmcu_data, data)
person xyres    schedule 28.04.2019