Как создать приложение Python с двумя потоками, в каждом из которых есть приложение автобана

Я не нашел никакого решения для моей проблемы. Мне нужно создать приложение Python с двумя потоками, каждый из которых подключен к маршрутизатору WAMP с использованием библиотеки автобанов.

Следуйте за мной, я пишу код своего эксперимента:

wampAddress = 'ws://172.17.3.139:8181/ws'
wampRealm = 's4t'

from threading import Thread
from autobahn.twisted.wamp import ApplicationRunner
from autobahn.twisted.wamp import ApplicationSession
from twisted.internet.defer import inlineCallbacks


class AutobahnMRS(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        print("Sessio attached [Connect to WAMP Router]")

        def onMessage(*args):
            print args
        try:
            yield self.subscribe(onMessage, 'test')
            print ("Subscribed to topic: test")

        except Exception as e:
            print("Exception:" +e)

class AutobahnIM(ApplicationSession):
    @inlineCallbacks
    def onJoin(self, details):
        print("Sessio attached [Connect to WAMP Router]")

        try:
            yield self.publish('test','YOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOOO')
            print ("Subscribed to topic: test")

        except Exception as e:
            print("Exception:" +e)

class ManageRemoteSystem:
    def __init__(self):
        self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)

    def start(self):
        self.runner.run(AutobahnMRS);


class InternalMessages:
    def __init__(self):
        self.runner = ApplicationRunner(url= wampAddress, realm = wampRealm)

    def start(self):
        self.runner.run(AutobahnIM);

#class S4tServer:

if __name__ == '__main__':
    server = ManageRemoteSystem()
    sendMessage = InternalMessages()

    thread1 = Thread(target = server.start())
    thread1.start()
    thread1.join()

    thread2 = Thread(target = sendMessage.start())
    thread2.start()
    thread2.join()

Когда я запускаю это приложение python, запускается только поток 1, а позже, когда я убиваю приложение (ctrl-c), отображаются следующие сообщения об ошибках:

Sessio attached [Connect to WAMP Router]
Subscribed to topic: test
^CTraceback (most recent call last):
  File "test_pub.py", line 71, in <module>
    p2 = multiprocessing.Process(target = server.start())
  File "test_pub.py", line 50, in start
    self.runner.run(AutobahnMRS);
  File "/usr/local/lib/python2.7/dist-packages/autobahn/twisted/wamp.py", line 175, in run
    reactor.run()
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1191, in run
    self.startRunning(installSignalHandlers=installSignalHandlers)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 1171, in startRunning
    ReactorBase.startRunning(self)
  File "/usr/local/lib/python2.7/dist-packages/twisted/internet/base.py", line 683, in startRunning
    raise error.ReactorNotRestartable()
twisted.internet.error.ReactorNotRestartable

Мне нужно реализовать в одном приложении, которое имеет свои функции, а также должно иметь систему для связи с маршрутизатором WAMP с библиотекой автобана Python.

Другими словами, мне нужно решение, способное взаимодействовать с маршрутизатором WAMP, но в то же время это приложение не должно быть заблокировано частью автобана (я думаю, что решение состоит в том, чтобы запустить два потока, один поток управляет некоторыми функциями и второй поток управляет частью автобана).

Со схемой, которую я предложил ранее, есть еще одна проблема, необходимость отправки сообщения, в определенной теме на маршрутизаторе WAMP, из прикладной части в «нет потока автобана», этот функционал должен вызываться через конкретную функцию без блокировки других функций.

Надеюсь, я рассказал все подробности.

Большое спасибо за любой ответ

--------------------------------РЕДАКТИРОВАТЬ----------------- ----------------

После некоторых исследований я реализовал то, что мне нужно для протокола websocket, код выглядит следующим образом:

# ----- twisted ----------
class _WebSocketClientProtocol(WebSocketClientProtocol):
    def __init__(self, factory):
        self.factory = factory

    def onOpen(self):
        #log.debug("Client connected")
        self.factory.protocol_instance = self
        self.factory.base_client._connected_event.set()

class _WebSocketClientFactory(WebSocketClientFactory):
    def __init__(self, *args, **kwargs):
        WebSocketClientFactory.__init__(self, *args, **kwargs)
        self.protocol_instance = None
        self.base_client = None

    def buildProtocol(self, addr):
        return _WebSocketClientProtocol(self)
# ------ end twisted -------
lass BaseWBClient(object):

    def __init__(self, websocket_settings):
        #self.settings = websocket_settings
        # instance to be set by the own factory
        self.factory = None
        # this event will be triggered on onOpen()
        self._connected_event = threading.Event()
        # queue to hold not yet dispatched messages
        self._send_queue = Queue.Queue()
        self._reactor_thread = None

    def connect(self):

        log.msg("Connecting to host:port")
        self.factory = _WebSocketClientFactory(
                                "ws://host:port",
                                debug=True)
        self.factory.base_client = self

        c = connectWS(self.factory)

        self._reactor_thread = threading.Thread(target=reactor.run,
                                               args=(False,))
        self._reactor_thread.daemon = True
        self._reactor_thread.start()

    def send_message(self, body):
        if not self._check_connection():
            return
        log.msg("Queing send")
        self._send_queue.put(body)
        reactor.callFromThread(self._dispatch)

    def _check_connection(self):
        if not self._connected_event.wait(timeout=10):
            log.err("Unable to connect to server")
            self.close()
            return False
        return True

    def _dispatch(self):
        log.msg("Dispatching")
        while True:
            try:
                body = self._send_queue.get(block=False)
            except Queue.Empty:
                break
            self.factory.protocol_instance.sendMessage(body)

    def close(self):
        reactor.callFromThread(reactor.stop)

import time
def Ppippo(coda):
        while True:
            coda.send_message('YOOOOOOOO')
            time.sleep(5)

if __name__ == '__main__':

    ws_setting = {'host':'', 'port':}

    client = BaseWBClient(ws_setting)

    t1 = threading.Thread(client.connect())
    t11 = threading.Thread(Ppippo(client))
    t11.start()
    t1.start()

Предыдущий код работает нормально, но мне нужно перевести его для работы с протоколом WAMP, вставленным в веб-сокет.

Кто-нибудь знает, как я решаю?


person alotronto    schedule 09.02.2015    source источник
comment
Переместите thread1.join() вниз, чтобы быть с thread2.join(). В своем текущем местоположении он скажет основному потоку подождать, пока поток 1 не умрет. Поскольку у вас нет возможности убить поток (без уничтожения всего процесса с помощью Ctrl-C), второй поток никогда не будет создан.   -  person Aaron D    schedule 09.02.2015
comment
Кроме того, ваши потоки должны выполнять свою работу в функции потока .run(). Вы join() создаете поток в конце своей основной функции, чтобы дать потокам время завершить свое выполнение до закрытия основного приложения. Таким образом, вам нужно иметь возможность для потоков завершить свою задачу.   -  person Aaron D    schedule 09.02.2015
comment
Вам нужно 2 сеанса приложения или действительно 2 потока? Если первое, оба сеанса относятся к одному и тому же маршрутизатору/области или к разным? Если второе, то зачем вообще нужны темы? Если вам нужно сделать, например. Материал, интенсивно использующий процессор, и вы хотите использовать многоядерный процессор, пожалуйста, сообщите нам об этом. Нужно больше, почему и что...   -  person oberstet    schedule 09.02.2015
comment
oberstet спасибо за ваш ответ. Я изменил вопрос с более подробной информацией.   -  person alotronto    schedule 10.02.2015


Ответы (1)


Плохая новость заключается в том, что Autobahn использует основной цикл Twisted, поэтому вы не можете запустить его в двух потоках одновременно.

Хорошая новость заключается в том, что вам не нужно запускать его в двух потоках для выполнения двух вещей, а два потока в любом случае будут более сложными.

API для начала работы с несколькими приложениями немного сбивает с толку, потому что у вас есть два объекта ApplicationRunner, и на первый взгляд кажется, что вы запускаете приложение в автобане, вызывая ApplicationRunner.run.

Тем не менее, ApplicationRunner — это просто удобство, обертывающее то, что настраивает приложение, и то, что запускает основной цикл; настоящее мясо работы происходит в WampWebSocketClientFactory.

Чтобы достичь того, чего вы хотите, вам просто нужно избавиться от потоков и запустить основной цикл самостоятельно, заставив экземпляры ApplicationRunner просто настроить свои приложения.

Чтобы добиться этого, вам нужно изменить последнюю часть вашей программы, чтобы сделать это:

class ManageRemoteSystem:
    def __init__(self):
        self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm)

    def start(self):
        # Pass start_reactor=False to all runner.run() calls
        self.runner.run(AutobahnMRS, start_reactor=False)


class InternalMessages:
    def __init__(self):
        self.runner = ApplicationRunner(url=wampAddress, realm=wampRealm)

    def start(self):
        # Same as above
        self.runner.run(AutobahnIM, start_reactor=False)


if __name__ == '__main__':
    server = ManageRemoteSystem()
    sendMessage = InternalMessages()
    server.start()
    sendMessage.start()

    from twisted.internet import reactor
    reactor.run()
person Glyph    schedule 09.02.2015
comment
API немного отстой, да, особенно для этого варианта использования. У нас есть неопубликованные материалы в другом репо, которые позволяют запускать несколько сеансов с помощью одного вызова, который возвращает DeferredList (который разрешается с помощью отдельных сеансов приложения WAMP). Вероятно, это должно быть в Autobahn.. - person oberstet; 09.02.2015
comment
Извините, оберстет, а можно поточнее проиллюстрировать это - person alotronto; 23.02.2015
comment
Спасибо @Глиф!!! Я целую вечность искал этот аргумент start_reactor, но, кажется, в документах он не упоминается... или как иначе можно добавить автобан в существующее приложение Twisted? - person jjmontes; 11.09.2015
comment
@jjmontes - я уверен, что это единственный способ. Баг оберстет за обновлениями ;-). - person Glyph; 11.09.2015