RabbitMQ/Pika блокирует потребителя с помощью дажелета

Я работаю над более крупным приложением, для которого требуется eventlet, а теперь также требуется RabbitMQ. Похоже, что eventlet заставляет потребительский поток pika блокировать выполнение дополнительных рабочих процессов. Я знаю, что Pika не считается потокобезопасным, поэтому у меня есть все, включая соединение, в своем собственном потоке. Я бы предположил, что блокирующее соединение должно блокировать только потребительский поток. Как я могу заставить pika и eventlet работать вместе? В приведенном ниже примере рабочий поток никогда ничего не выводит, но закомментирование eventlet.monkey_patch() позволяет выполнять оба потока.

import threading
import pika

import eventlet
eventlet.monkey_patch()


def callback(ch, method, properties, body):
    print body
    ch.basic_ack(delivery_tag=method.delivery_tag)


def consumer():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='test', durable=True,
                          exclusive=False, auto_delete=False)
    channel.basic_consume(callback, queue='test')
    channel.start_consuming()


def start_consumer_thread():
    # initialize a listener thread
    consumer_thread = threading.Thread(target=consumer)
    consumer_thread.start()

def worker():
    start_consumer_thread()
    for x in range(1,10000):
        print x


x = threading.Thread(target=worker())
x.start()

person user2242044    schedule 04.07.2018    source источник


Ответы (2)


Пика и eventlet.monkey_patch не совместимы. Вам придется использовать eventlet без исправления системных вызовов, если это возможно.


ПРИМЕЧАНИЕ. команда RabbitMQ отслеживает rabbitmq-users список рассылки и лишь иногда отвечает на вопросы в StackOverflow.

person Luke Bakken    schedule 05.07.2018
comment
Спасибо. Есть ли способ периодически проверять сообщение в очереди и использовать его, а не иметь бесконечный цикл блокировки? - person user2242044; 05.07.2018
comment
Не уверен, что это правда. oslo.messaging использует pika, и это afaik, исправляющий обезьяну. Вы всегда должны исправлять обезьяны перед импортом других библиотек. Попробуйте переместить monkey_patch перед импортом pika. - person eandersson; 12.07.2018
comment
@eanandersson - похоже, это больше не так e2407fa53c91fe5c.yaml" rel="nofollow noreferrer">ссылка. Это позор, поскольку Pika намного надежнее, чем kombu или другие библиотеки Python для RabbitMQ. - person Luke Bakken; 12.07.2018
comment
Да, это было ошибкой, когда они представили поддержку pika, они не позволили смешивать драйверы pika и kombu. Поэтому, если вы хотели использовать pika, вам пришлось отключить все облако, чтобы обновить конфигурацию. - person eandersson; 12.07.2018
comment
кстати для надежности! Я предпочитаю свою библиотеку amqp =] - person eandersson; 12.07.2018

Мне удалось заставить потребителя pika работать с eventlet путем внесения исправлений как можно раньше, а также импортировать pika с исправлениями.

Сначала импортируйте и исправьте stdlib:

import eventlet
eventlet.monkey_patch()

Затем импортируйте и пропатчите сам pika:

pika = eventlet.import_patched('pika')

Я использовал эту стратегию импорта в сочетании с asynchronous_consumer_example: https://pika.readthedocs.io/en/stable/examples/asynchronous_consumer_example.html и использовали примитивы eventlet вместо threading для достижения неблокирующего потребителя.

person tim-ireland    schedule 21.08.2018