Python Pika — потребитель в потоке

Я работаю над приложением Python с фоновым потоком для использования сообщений из очереди RabbitMQ (тематический сценарий).

Я запускаю поток в событии on_click кнопки. Вот мой код, обратите внимание на "#self.receive_command()".

def on_click_start_call(self,widget):


    t_msg = threading.Thread(target=self.receive_command)
    t_msg.start()
    t_msg.join(0)
    #self.receive_command()


def receive_command(self):

    syslog.syslog("ENTERED")

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    syslog.syslog("1")

    channel = connection.channel()
    syslog.syslog("2")

    channel.exchange_declare(exchange='STORE_CMD', type='topic')
    syslog.syslog("3")

    result = channel.queue_declare(exclusive=True)
    syslog.syslog("4")

    queue_name = result.method.queue
    syslog.syslog("5")

    def callback_rabbit(ch,method,properties,body):
        syslog.syslog("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")

    syslog.syslog("6")

    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
    syslog.syslog("7")

    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
    syslog.syslog("8")

    channel.start_consuming()

Если я запущу этот код, я не увижу в системном журнале сообщение 1,2,3,5,6,7,8, но увижу только "ENTERED". Итак, код залочен на pika.BlokingConnection.

Если я запускаю один и тот же код (комментируя инструкцию потока и раскомментируя прямой вызов функции), все работает так, как ожидалось, и сообщение принимается правильно.

Есть какие-нибудь решения запустить потребителя в поток?

Заранее спасибо

Давиде


person user2056899    schedule 03.05.2013    source источник


Ответы (2)


Я протестировал код на своей машине с последней версией Pika. Это работает нормально. В Pika есть проблемы с потоками, но пока вы создаете одно соединение для каждого потока, это не должно быть проблемой.

Если у вас возникли проблемы, это, скорее всего, связано с ошибкой в ​​более старой версии Pika или не связанными с вашими потоками проблемами, вызывающими проблему.

Я бы порекомендовал вам избегать версии 0.9.13, так как в ней много ошибок, но очень скоро должен быть выпущен 0.9.14 0.10.0™.

[Изменить] Выпущена версия Pika 0.9.14.

Это код, который я использовал.

def receive_command():
    print("ENTERED")
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    print("1")
    channel = connection.channel()
    print("2")
    channel.exchange_declare(exchange='STORE_CMD', type='topic')
    print("3")
    result = channel.queue_declare(exclusive=True)
    print("4")
    queue_name = result.method.queue
    print("5")
    def callback_rabbit(ch,method,properties,body):
        print("RICEVUTO MSG: RKEY:"+method.routing_key+" MSG: "+body+"\n")
    print("6")
    channel.queue_bind(exchange='STORE_CMD', queue=queue_name , routing_key='test.routing.key')
    print("7")
    channel.basic_consume(callback_rabbit,queue=queue_name,no_ack=True)
    print("8")
    channel.start_consuming()

def start():
    t_msg = threading.Thread(target=receive_command)
    t_msg.start()
    t_msg.join(0)
    #self.receive_command()
start()
person eandersson    schedule 20.06.2013
comment
Спасибо, что поделился. Если мы добавим второй поток (отдельное соединение, отдельную очередь и т. д., поэтому, по определению, никаких проблем с параллелизмом) — как бы вы изящно обработали завершение работы приложения? Например, есть ли какой-либо вред в том, чтобы просто выдать контроль-C (прерывание клавиатуры) или это приводит к тому, что оба потока получают одну и ту же последовательность завершения работы. Любая логика finally и т. д., чтобы закрыть соединения кролика, необходимые родительскому процессу (основному)? - person arcseldon; 29.01.2020
comment
Я просмотрел список рассылки кролика (groups.google.com/forum/#! forum/rabbitmq-users) и еще не нашел ответа. ваш код выше выглядит очень близко к тому, что мне нужно. Просто ищу подтверждение по части выключения. - person arcseldon; 29.01.2020
comment
И последний вопрос - есть ли у вас какие-либо мысли об этом решении (с использованием процессов вместо потоков) - stackoverflow.com/a/45142386/1882064 Есть какие-нибудь особенности завершения работы многопроцессорных систем? - person arcseldon; 29.01.2020

Другой подход - передать метод потока channel.start_consuming в качестве цели, а затем просто передать обратный вызов методу consume. Использование: consume(callback=your_method, queue=your_queue)

import threading

def consume(self, *args, **kwargs):
    if "channel" not in kwargs \
            or "callback" not in kwargs \
            or "queue" not in kwargs \
            or not callable(kwargs["callback"]):
        return None

    channel = kwargs["channel"]
    callback = kwargs["callback"]
    queue = kwargs["queue"]
    channel.basic_consume(callback, queue=queue, no_ack=True)

    t1 = threading.Thread(target=channel.start_consuming)
    t1.start()
    t1.join(0)
person dorintufar    schedule 08.08.2017