Использовать несколько очередей в python / pika

Я пытаюсь создать потребителя, который будет подписываться на несколько очередей, а затем обрабатывать сообщения по мере их поступления.

Проблема в том, что когда в первой очереди уже присутствуют некоторые данные, они потребляют первую очередь и никогда не перейдут на использование второй очереди. Однако, когда первая очередь пуста, она переходит в следующую очередь, а затем использует обе очереди одновременно.

Сначала я реализовал многопоточность, но хочу избежать этого, когда библиотека pika сделает это за меня без особых сложностей. Ниже мой код:

import pika

mq_connection = pika.BlockingConnection(pika.ConnectionParameters('x.x.x.x'))
mq_channel = mq_connection.channel()
mq_channel.basic_qos(prefetch_count=1)


def callback(ch, method, properties, body):
    print body
    mq_channel.basic_ack(delivery_tag=method.delivery_tag)

mq_channel.basic_consume(callback, queue='queue1', consumer_tag="ctag1.0")
mq_channel.basic_consume(callback, queue='queue2', consumer_tag="ctag2.0")
mq_channel.start_consuming()

person user3295878    schedule 01.07.2014    source источник
comment
Я пробовал ваш код с единственным изменением: добавление регистратора для предотвращения исключений и объявление очередей. Код работает как положено. Я публикую несколько сообщений в каждую очередь, и сообщения маршрутизируются и отражаются в интерфейсе командной строки.   -  person old_sound    schedule 02.07.2014
comment
Эй, можешь попробовать с предварительно заполненными очередями, а затем запустить потребителя. Сообщите мне, работает ли это так, как ожидалось.   -  person user3295878    schedule 02.07.2014
comment
Я просто попробовал это, и это не сработало. Я вижу сообщения только из первой очереди.   -  person old_sound    schedule 02.07.2014
comment
Это то, о чем я говорю. Странно, не правда ли? Есть идеи?   -  person user3295878    schedule 02.07.2014
comment
Я мало что знаю о клиенте python, поэтому я попросил Гэвина ответить   -  person old_sound    schedule 03.07.2014
comment
Работает ли он так же с другими клиентами? Можете ли вы попробовать это в любом другом клиенте? Если это особенность пищухи, то ее придется поднять. Хотя Гэвин дал хорошее предложение, оно уже было реализовано.   -  person user3295878    schedule 03.07.2014
comment
Я только что попробовал с клиентом php-amqplib, и он работает, как ожидалось. Я предварительно публикую сообщения в обе очереди, а затем все они используются.   -  person old_sound    schedule 03.07.2014
comment
Хорошо знать. Я собирался поднять этот вопрос, когда узнал, что Гэвин является автором pika. Теперь, похоже, Гэвин должен мне помочь.   -  person user3295878    schedule 03.07.2014
comment
Возможный дубликат Python и RabbitMQ - Лучший способ прослушивать события с нескольких каналов?   -  person Vineet Menon    schedule 03.11.2015


Ответы (4)


Одно из возможных решений - использовать неблокирующее соединение и потреблять сообщения.

import pika


def callback(channel, method, properties, body):
    print(body)
    channel.basic_ack(delivery_tag=method.delivery_tag)


def on_open(connection):
    connection.channel(on_channel_open)


def on_channel_open(channel):
    channel.basic_consume(callback, queue='queue1')
    channel.basic_consume(callback, queue='queue2')


parameters = pika.URLParameters('amqp://guest:guest@localhost:5672/%2F')
connection = pika.SelectConnection(parameters=parameters,
                                   on_open_callback=on_open)

try:
    connection.ioloop.start()
except KeyboardInterrupt:
    connection.close()

Это будет подключаться к нескольким очередям и соответственно потреблять сообщения.

person ChillarAnand    schedule 20.02.2017
comment
Не могли бы вы сказать мне цель% 2F в конце? - person Rápli András; 13.03.2017
comment
@ RápliAndrás При подключении к rabbitmq необходимо указать виртуальный хост. Хост по умолчанию - /, который заменяется на %2f. - person ChillarAnand; 13.03.2017
comment
Стоит отметить, что этот код у меня не работал с Pika 1.1.0. Достаточно просто добавить on_open_callback = в метод on_open: connection.channel (on_open_callback = on_channel_open) и on_message_callback = в метод on_channel_open: channel.basic_consume (on_open_callback = callback, queue_consume (on_open_callback = callback, queue_consume) = ' , queue = 'queue2') - person StratocastFlo; 03.02.2021
comment
Есть ли способ определить приоритет очередей? - person ThunderPhoenix; 04.04.2021

Проблема, скорее всего, заключается в том, что первый вызов вызвал Basic.Consume и уже получил сообщения из предварительно заполненной очереди до того, как был выпущен второй вызов. Вы можете попробовать установить счетчик предварительной выборки QoS равным 1, что ограничит RabbitMQ возможность отправлять вам более одного сообщения за раз.

person Gavin M. Roy    schedule 02.07.2014
comment
Как видно из кода, он уже установлен на 1. Что еще вы можете придумать? - person user3295878; 02.07.2014
comment
Еще одна вещь, я не думаю, что потребитель действительно начнет потреблять, пока не достигнет start_consuming. Однако необходимо проверить. - person user3295878; 02.07.2014
comment
Привет, Гэвин, я только что попробовал эту функциональность с библиотекой kombu для python, и она сработала, как ожидалось. - person user3295878; 03.07.2014
comment
Я попробую использовать бэкэнд Async и дам вам знать. - person user3295878; 04.07.2014
comment
Я подписался на pika.readthedocs.org/en/0.9.13/examples/, и все заработало, как ожидалось. Первая очередь была полностью использована, а затем соответственно использована вторая очередь. - person user3295878; 04.07.2014
comment
@ user3295878 Как вы это сделали? Вы в основном взяли свой код выше и заменили его там, где вы объявляете очередь - person Johnathon64; 09.03.2016
comment
Я перешел с пика на комбу. - person user3295878; 10.03.2016

Подобно комментариям в первом ответе выше, я смог получить аналогичные результаты с pika 1.1.0 и следующим:

import pika

def queue1_callback(ch, method, properties, body):
  print(" [x] Received queue 1: %r" % body)

def queue2_callback(ch, method, properties, body):
  print(" [x] Received queue 2: %r" % body)

def on_open(connection):
  connection.channel(on_open_callback = on_channel_open)


def on_channel_open(channel):
  channel.basic_consume('queue1', queue1_callback, auto_ack = True)
  channel.basic_consume('queue2', queue2_callback, auto_ack = True)

credentials = pika.PlainCredentials('u', 'p')
parameters = pika.ConnectionParameters('localhost', 5672, '/', credentials)
connection = pika.SelectConnection(parameters = parameters, on_open_callback = on_open)

Try:
  connection.ioloop.start()
except KeyboardInterrupt:
  connection.close()
  connection.ioloop.start()

person 5demayo    schedule 04.02.2021

Стоит отметить, что указанное выше решение работает, только если auto_ack имеет значение True.

person Mayuresh Gaitonde    schedule 24.02.2021