Twisted — как сделать много кода Python неблокирующим

Я пытался заставить этот скрипт выполнять код в hub() в письменном порядке.

hub() содержит смесь стандартного кода Python и запросов на выполнение операций ввода-вывода с использованием Twisted и Crossbar.

Однако, поскольку код Python блокирует, у Reactor нет возможности выполнить эти задачи «публикации». Мой внешний интерфейс получает все опубликованные сообщения в конце.

  1. Этот код представляет собой сильно упрощенную версию того, с чем я на самом деле имею дело. Настоящий скрипт (hub() и другие методы, которые он вызывает) имеет длину более 1500 строк. Модификация всех этих функций, чтобы сделать их неблокирующими, не идеальна. Я бы предпочел изолировать изменения в нескольких методах, таких как publish(), если это возможно для решения этой проблемы.
  2. Я играл с такими терминами, как async, await, deferLater, loopingCall и другими. Я не нашел пример, который помог бы еще в моей ситуации.

Есть ли способ изменить publish() (или hub()), чтобы они отправляли сообщения по порядку?

from autobahn.twisted.component import Component, run
from twisted.internet.defer import inlineCallbacks, returnValue
from twisted.internet import reactor, defer

component = Component(
    transports=[
        {
            u"type": u"websocket",
            u"url": u"ws://127.0.0.1:8080/ws",
            u"endpoint": {
                u"type": u"tcp",
                u"host": u"localhost",
                u"port": 8080,
            },
            u"options": {
                u"open_handshake_timeout": 100,
            }
        },
    ],
    realm=u"realm1",
)

@component.on_join
@inlineCallbacks
def join(session, details):
    print("joined {}: {}".format(session, details))

    def publish(context='output', value='default'):
        """ Publish a message. """
        print('publish', value)
        session.publish(u'com.myapp.universal_feedback', {"id": context, "value": value})

    def hub(thing):
        """ Main script. """
        do_things
        publish('output', 'some data for you')
        do_more_things
        publish('status', 'a progress message')
        do_even_more_things
        publish('status', 'some more data')
        do_all_the_things
        publish('other', 'something else')

    try:
        yield session.register(hub, u'com.myapp.hello')
        print("procedure registered")
    except Exception as e:
        print("could not register procedure: {0}".format(e))


if __name__ == "__main__":
    run([component])
    reactor.run()

person sscirrus    schedule 12.01.2018    source источник


Ответы (2)


Ваша функция join() является асинхронной (украшена @inlineCallbacks и содержит как минимум один yield в теле).

Внутри он регистрирует функцию hub() как WAMP RPC; Однако hub() не является асинхронным.

Также вызовы session.publish() не выполняются, как должны быть асинхронные вызовы.

Результат: вы добавляете кучу событий в цикл событий, но не ждете их, пока не сбросите цикл событий при завершении работы приложения.

person Freek Wiekmeijer    schedule 20.02.2018

Вам нужно сделать свой функциональный центр и опубликовать async.

@inlineCallbacks
def publish(context='output', value='default'):
    """ Publish a message. """
    print('publish', value)
    yield session.publish(u'com.myapp.universal_feedback', {"id": context, "value": value})

@inlineCallbacks
def hub(thing):
    """ Main script. """
    do_things
    yield publish('output', 'some data for you')
    do_more_things
    yield publish('status', 'a progress message')
    do_even_more_things
    yield publish('status', 'some more data')
    do_all_the_things
    yield publish('other', 'something else')
person Oleksandr Yarushevskyi    schedule 10.07.2018