Может ли eventlet управлять соединением AMQP с асинхронной передачей сообщений как на входе, так и на выходе?

Фактический дизайн:

Для тех, кто возвращается к этому вопросу, полезный ответ ниже подтолкнул меня к созданию работоспособного дизайна, который работает нормально. Ключевыми были три вывода:

  1. Eventlet - очень безопасная среда - если два гринлета одновременно пытаются recv() или оба пытаются send() с одного и того же сокета, то Eventlet элегантно уничтожает второй гринлет с исключением. Это великолепно и означает, что простые исключения, а также возможные ошибки чередования данных, которые невозможно воспроизвести, будут результатом, если amqplib плохо «согреет».
  2. The amqplib methods fall roughly into two groups: wait() loops inside of recv() until an AMQP message is assembled, while other methods send() messages back and will not attempt their own recv(). This is stunningly good luck, given that the amqplib authors had no idea that someone would try to “green” their library! It means that message sending is not only safe from the callback invoked by wait(), but that messages can also be sent safely from other greenlets that are completely outside the control of the wait() loop. These safe methods — that can be called from any greenlet, not just from the wait() callback — are:
    1. basic_ack
    2. basic_consume с nowait=True
    3. basic_publish
    4. basic_recover
    5. basic_reject
    6. exchange_declare с nowait=True
    7. exchange_delete с nowait=True
    8. queue_bind с nowait=True
    9. queue_unbind с nowait=True
    10. queue_declare с nowait=True
    11. queue_delete с nowait=True
    12. queue_purge с nowait=True
  3. Семафоры можно использовать в качестве блокировок: инициализируйте семафор счетчиком 1, а затем acquire() и release() для блокировки и разблокировки. Все мои асинхронные гринлеты, которые хотят писать сообщения, могут использовать такую ​​блокировку, чтобы их отдельные send() вызовы не чередовались и не разрушали протокол AMQP.

Итак, мой код выглядит примерно так:

amqp = eventlet.patcher.import_patched('amqplib.client_0_8')

class Processor(object):
    def __init__(self):
        write_lock = eventlet.semaphore.Semaphore(1)

    def listening_greenlet(channel):
        # start this using eventlet.spawn_n()
        # create Connection and self.channel
        self.channel.basic_consume(queue, callback=self.consume)
        while True:
            self.channel.wait()

    def safe_publish(channel, *args, **kw):
        with write_lock:  # yes, Eventlet supports this!
            channel.basic_publish(*args, **kw)     

    def consume(message):
        # Returning immediately frees the wait() loop
        eventlet.spawn_n(self.process, message)

    def process(message):
        # do whatever I want
        # whenever I am done, I can async reply:
        self.safe_publish(...)

Наслаждаться!

Исходный вопрос:

Представьте себе сотни сообщений AMQP, поступающих каждую минуту в небольшое приложение Python Eventlet, каждое из которых требует обработки и ответа - где Издержки процессора при обработке будут минимальными, но могут потребовать ожидания ответов от других служб и сокетов.

Чтобы разрешить одновременную обработку, скажем, 100 сообщений, я, конечно, мог бы развернуть 100 отдельных TCP-соединений с RabbitMQ и иметь рабочего для каждого соединения, которое принимает, обрабатывает и отвечает на отдельные сообщения в режиме блокировки. Но чтобы сохранить TCP-соединения, я бы предпочел создать только одно соединение AMQP, разрешить RabbitMQ передавать сообщения по каналу на меня на полной скорости, передавать эти задачи рабочим и отправлять ответы обратно, когда каждый рабочий завершает:

                                       +--------+
                                +------| worker | <-+
                                |      +--------+   |
                                |      +--------+   |
                                | +----| worker | <-+
                                | |    +--------+   |
                                | |    +--------+   |
                                | | +--| worker | <-+
                                | | |  +--------+   |
                                v v v               |
                           +------------+           |
 RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
                           +------------+

Обратите внимание:

  • Очередь Eventlet может элегантно распределять входящую работу между рабочими, когда они становятся доступными для дополнительной работы.
  • Возможно даже управление потоком из RabbitMQ: я могу ACK-сообщения только до тех пор, пока все мои рабочие не будут заняты, а затем ждать перед отправкой дальнейших ACK, пока очередь не начнет опустошаться.
  • Работа почти наверняка будет завершена не по порядку: один запрос может завершиться быстро, в то время как другое событие, пришедшее раньше, займет гораздо больше времени; а некоторые запросы могут вообще никогда не завершиться; поэтому рабочие будут возвращать ответы в непредсказуемом и асинхронном порядке.

Я планировал написать это с помощью Eventlet и py-amqplib после просмотра этого привлекательного сообщения в блоге о том, как легко эту библиотеку AMQP можно включить в модель обработки Eventlet:

http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/

Моя проблема в том, что, прочитав документацию для обеих библиотек, исходного кода amqplib и большей части исходного кода Eventlet, я не могу понять, как я могу научить eventlet, которому принадлежит соединение AMQP - eventlet с именем connect_to_host() в сообщении в блоге - чтобы также просыпаться, когда рабочий завершает свою работу и генерирует ответ. Метод wait() в amqplib может быть разбужен только активностью в сокете AMQP. Хотя мне кажется, что я должен иметь возможность, чтобы рабочие записывали свои ответы в очередь, а событие connect_to_host() просыпалось либо, когда приходит новое входящее сообщение, или когда рабочий готов с ответом для отправки, я не могу найти способ для eventlet сказать «разбуди меня, когда любое из этих событий произойдет».

Мне действительно пришло в голову, что рабочие могут попытаться присвоить объект соединения AMQP - или даже необработанный сокет - и записать свои собственные сообщения обратно через TCP; но кажется, что блокировки были бы необходимы, чтобы предотвратить чередование исходящих рабочих сообщений друг с другом или с сообщениями ACK, написанными основным событием слушателя, и я также не могу найти, где блокировки доступны в Eventlet.

Все это дает мне почти уверенность в том, что я пытаюсь решить эту проблему как-то в обратном направлении. Неужели такая проблема - разрешение безопасного совместного использования одного соединения между слушателем-диспетчером и многими рабочими процессами - просто не соответствует модели сопрограмм и требует полноценной асинхронной библиотеки? (В каком случае: есть ли тот, который вы бы порекомендовали для этой проблемы, и как будет происходить мультиплексирование между входящими сообщениями и исходящими ответами рабочих? Сегодня я не нашел чистого решения, попробовав такие комбинации, как Pika + ioloop, хотя я только что видел другой библиотека, stformed_amqp, которая могла бы работать лучше, чем это сделала Pika.) Или мне действительно нужно вернуться к реальным живым потокам Python, если мне нужен чистый и поддерживаемый код, который может реализовать эту модель? Я открыт для всех вариантов.

Спасибо за любую помощь или идеи! Я продолжаю думать, что у меня в значительной степени не работает весь параллелизм в Python, а затем я снова узнаю, что это не так. :) И я надеюсь, что в любом случае вам понравился рисунок ASCII выше.


person Brandon Rhodes    schedule 02.11.2011    source источник
comment
Это очень полезный вопрос даже спустя три года. Почему блокировка, которую вы приобретаете около basic_publish, не требуется для basic_ack, basic_reject и других «безопасных» методов, которые вы перечисляете? Причина в том, что без отправляемого тела сообщение настолько мало, что вы гарантированно сможете завершить запись в сокет, не уступая другому гринлету?   -  person Matt    schedule 31.12.2014


Ответы (1)


После прочтения вашего сообщения и работы с gevent подобной библиотекой, как eventlet, мне стало ясно несколько вещей, потому что я только что решил аналогичную проблему.

В общем, нет необходимости в блокировке, так как когда-либо одновременно работает только один eventlet или greenlet, пока ни один из них не блокирует, все, кажется, выполняется одновременно .. НО вы не хотите отправлять данные через сокет, пока другой гринлет отправляет. Вы правы, и для этого действительно нужна блокировка.

Если у меня есть подобные вопросы, поиска в документации будет недостаточно ... посмотрите исходники! его открытый исходный код в любом случае вы узнаете тонну больше, глядя на код других людей.

вот несколько упрощенных примеров кода, которые могут прояснить вам ситуацию.

в вашем диспетчере 2 очереди

self.worker_queue = Queue() # queue for messages to workers
self.server_queue = Queue() # queue for messages to ampq server

пусть рабочие помещают свой результат в очередь сервера.

код отправки и получения

def send_into_ampq():
    while True:
       message = dispatcher.get_workger_msg()

       try:
          connection.send(self.encode(message))
       except:
           connection.kill()

def read_from_ampq():
    while True:
        message = connection.wait()

        dispatcher.put_ampq_msg(self.decode(message))

в функции отправки кода подключения

self._writelock = Semaphore(1) 
# this is a gevent locking thing. eventlet must have something like this too..
# just counts - 1 for locks and +1 for releases blocks otherwise blocks until 
# 0 agian.... why not google it i though.. and yes im right:
# eventlet.semaphore.Semaphore(value=1)

def send(self, message):
    """
    you need a write lock to prevent more greenlets
    sending more messages when previous sent is not done yet.
    """

    with self._writelock:
        self.socket.sendall(message)
person Stephan    schedule 02.11.2011
comment
СПАСИБО - я забыл поискать более общие механизмы, которые можно было бы использовать в качестве блокировок, и просто искал «блокировку» в документации. Итак, правила будут выглядеть так? (1) Допускается ТОЛЬКО один ивентлет wait(). (2) Другие eventlet должны использовать nowait=True, если они делают что-то вроде создания или удаления очередей, поэтому два eventlet не прослушивают входящие данные одновременно и, возможно, чередуют фрагменты входящих сообщений. (3) Операции, которые записывают ИСХОДЯЩИЕ данные, должны быть защищены друг от друга семафорной блокировкой. Правильно? - person Brandon Rhodes; 02.11.2011
comment
1. пусть только один ивентлет будет ждать нового полного сообщения от сервера. 2. когда рабочие помещают сообщения в очередь сервера, используйте nowait = True. 3. Чтобы несколько сообщений не отправлялись одновременно через сокет, используйте семафор. - person Stephan; 02.11.2011
comment
О, вау - если я забуду nowait=True, тогда Eventlet будет жаловаться: «RuntimeError: обнаружено второе одновременное чтение файла no 3» - так что Eventlet намного безопаснее, чем я предполагал. - person Brandon Rhodes; 02.11.2011
comment
не думаю, что вам нужно nowait = True на самом деле ... просто пусть ваши рабочие получают и помещают сообщения в очередь. - person Stephan; 03.11.2011
comment
Что ж, я получаю указанное исключение, если nowait=True отсутствует в коде. :) (Что имеет смысл: операция, которая не «ждет», на самом деле выполняет recv() в сокете, чтобы вернуть сообщение об успехе или неудаче, которое конкурирует с recv(), которое уже выполняется внутри вызова wait() в основном цикл слушателя.) - person Brandon Rhodes; 03.11.2011