Фактический дизайн:
Для тех, кто возвращается к этому вопросу, полезный ответ ниже подтолкнул меня к созданию работоспособного дизайна, который работает нормально. Ключевыми были три вывода:
- Eventlet - очень безопасная среда - если два гринлета одновременно пытаются
recv()
или оба пытаютсяsend()
с одного и того же сокета, то Eventlet элегантно уничтожает второй гринлет с исключением. Это великолепно и означает, что простые исключения, а также возможные ошибки чередования данных, которые невозможно воспроизвести, будут результатом, еслиamqplib
плохо «согреет». - The
amqplib
methods fall roughly into two groups:wait()
loops inside ofrecv()
until an AMQP message is assembled, while other methodssend()
messages back and will not attempt their ownrecv()
. This is stunningly good luck, given that theamqplib
authors had no idea that someone would try to “green” their library! It means that message sending is not only safe from the callback invoked bywait()
, but that messages can also be sent safely from other greenlets that are completely outside the control of thewait()
loop. These safe methods — that can be called from any greenlet, not just from thewait()
callback — are:basic_ack
basic_consume
сnowait=True
basic_publish
basic_recover
basic_reject
exchange_declare
сnowait=True
exchange_delete
сnowait=True
queue_bind
сnowait=True
queue_unbind
сnowait=True
queue_declare
сnowait=True
queue_delete
сnowait=True
queue_purge
сnowait=True
- Семафоры можно использовать в качестве блокировок: инициализируйте семафор счетчиком
1
, а затемacquire()
иrelease()
для блокировки и разблокировки. Все мои асинхронные гринлеты, которые хотят писать сообщения, могут использовать такую блокировку, чтобы их отдельныеsend()
вызовы не чередовались и не разрушали протокол AMQP.
Итак, мой код выглядит примерно так:
amqp = eventlet.patcher.import_patched('amqplib.client_0_8')
class Processor(object):
def __init__(self):
write_lock = eventlet.semaphore.Semaphore(1)
def listening_greenlet(channel):
# start this using eventlet.spawn_n()
# create Connection and self.channel
self.channel.basic_consume(queue, callback=self.consume)
while True:
self.channel.wait()
def safe_publish(channel, *args, **kw):
with write_lock: # yes, Eventlet supports this!
channel.basic_publish(*args, **kw)
def consume(message):
# Returning immediately frees the wait() loop
eventlet.spawn_n(self.process, message)
def process(message):
# do whatever I want
# whenever I am done, I can async reply:
self.safe_publish(...)
Наслаждаться!
Исходный вопрос:
Представьте себе сотни сообщений AMQP, поступающих каждую минуту в небольшое приложение Python Eventlet, каждое из которых требует обработки и ответа - где Издержки процессора при обработке будут минимальными, но могут потребовать ожидания ответов от других служб и сокетов.
Чтобы разрешить одновременную обработку, скажем, 100 сообщений, я, конечно, мог бы развернуть 100 отдельных TCP-соединений с RabbitMQ и иметь рабочего для каждого соединения, которое принимает, обрабатывает и отвечает на отдельные сообщения в режиме блокировки. Но чтобы сохранить TCP-соединения, я бы предпочел создать только одно соединение AMQP, разрешить RabbitMQ передавать сообщения по каналу на меня на полной скорости, передавать эти задачи рабочим и отправлять ответы обратно, когда каждый рабочий завершает:
+--------+
+------| worker | <-+
| +--------+ |
| +--------+ |
| +----| worker | <-+
| | +--------+ |
| | +--------+ |
| | +--| worker | <-+
| | | +--------+ |
v v v |
+------------+ |
RabbitMQ <-AMQP-> socket--| dispatcher |-----------+
+------------+
Обратите внимание:
- Очередь Eventlet может элегантно распределять входящую работу между рабочими, когда они становятся доступными для дополнительной работы.
- Возможно даже управление потоком из RabbitMQ: я могу ACK-сообщения только до тех пор, пока все мои рабочие не будут заняты, а затем ждать перед отправкой дальнейших ACK, пока очередь не начнет опустошаться.
- Работа почти наверняка будет завершена не по порядку: один запрос может завершиться быстро, в то время как другое событие, пришедшее раньше, займет гораздо больше времени; а некоторые запросы могут вообще никогда не завершиться; поэтому рабочие будут возвращать ответы в непредсказуемом и асинхронном порядке.
Я планировал написать это с помощью Eventlet и py-amqplib после просмотра этого привлекательного сообщения в блоге о том, как легко эту библиотеку AMQP можно включить в модель обработки Eventlet:
http://blog.eventlet.net/2010/02/09/multiple-concurrent-connections-with-py-amqplib-and-eventlet/
Моя проблема в том, что, прочитав документацию для обеих библиотек, исходного кода amqplib и большей части исходного кода Eventlet, я не могу понять, как я могу научить eventlet, которому принадлежит соединение AMQP - eventlet с именем connect_to_host()
в сообщении в блоге - чтобы также просыпаться, когда рабочий завершает свою работу и генерирует ответ. Метод wait()
в amqplib может быть разбужен только активностью в сокете AMQP. Хотя мне кажется, что я должен иметь возможность, чтобы рабочие записывали свои ответы в очередь, а событие connect_to_host()
просыпалось либо, когда приходит новое входящее сообщение, или когда рабочий готов с ответом для отправки, я не могу найти способ для eventlet сказать «разбуди меня, когда любое из этих событий произойдет».
Мне действительно пришло в голову, что рабочие могут попытаться присвоить объект соединения AMQP - или даже необработанный сокет - и записать свои собственные сообщения обратно через TCP; но кажется, что блокировки были бы необходимы, чтобы предотвратить чередование исходящих рабочих сообщений друг с другом или с сообщениями ACK, написанными основным событием слушателя, и я также не могу найти, где блокировки доступны в Eventlet.
Все это дает мне почти уверенность в том, что я пытаюсь решить эту проблему как-то в обратном направлении. Неужели такая проблема - разрешение безопасного совместного использования одного соединения между слушателем-диспетчером и многими рабочими процессами - просто не соответствует модели сопрограмм и требует полноценной асинхронной библиотеки? (В каком случае: есть ли тот, который вы бы порекомендовали для этой проблемы, и как будет происходить мультиплексирование между входящими сообщениями и исходящими ответами рабочих? Сегодня я не нашел чистого решения, попробовав такие комбинации, как Pika + ioloop, хотя я только что видел другой библиотека, stformed_amqp, которая могла бы работать лучше, чем это сделала Pika.) Или мне действительно нужно вернуться к реальным живым потокам Python, если мне нужен чистый и поддерживаемый код, который может реализовать эту модель? Я открыт для всех вариантов.
Спасибо за любую помощь или идеи! Я продолжаю думать, что у меня в значительной степени не работает весь параллелизм в Python, а затем я снова узнаю, что это не так. :) И я надеюсь, что в любом случае вам понравился рисунок ASCII выше.
basic_publish
, не требуется дляbasic_ack
,basic_reject
и других «безопасных» методов, которые вы перечисляете? Причина в том, что без отправляемого тела сообщение настолько мало, что вы гарантированно сможете завершить запись в сокет, не уступая другому гринлету? - person Matt   schedule 31.12.2014