Как проверить, нет ли сообщения в RabbitMQ с помощью Pika и Python

Я читаю сообщения от RabbitMQ с библиотекой pika python. Чтение сообщений в цикле выполняется

connection = rpc.connect()
channel = connection.channel()
channel.basic_consume(rpc.consumeCallback, queue=FromQueue, no_ack=Ack)
channel.start_consuming()

Это прекрасно работает. Но у меня также есть необходимость прочитать одно единственное сообщение, что я и делаю:

method, properties, body = channel.basic_get(queue=FromQueue)
rpc.consumeCallback(Channel=channel,Method=method, Properties=properties,Body=body)

Но когда в очереди нет сообщения, скрипт крашится. Как реализовать метод get_empty(), описанный здесь< /а> ?


person cwhisperer    schedule 05.10.2018    source источник
comment
channel.start_consuming блокирует. Как вы можете позвонить channel.basic_get? Вы используете отдельные темы?   -  person noxdafox    schedule 05.10.2018
comment
Нет, я использую любой из них. Это параметр, который решает, какой из них используется.   -  person cwhisperer    schedule 07.10.2018


Ответы (3)


Я временно решил это, проверив ответ, например:

method, properties, body = channel.basic_get(queue=FromQueue)
if(method == None):
    ## queue is empty
person cwhisperer    schedule 08.10.2018

вы можете проверить пустое тело следующим образом:

def callback(ch, method, properties, body):
    decodeBodyInfo = body.decode('utf-8')
    if decodeBodyInfo != '':
        cacheResult = decodeBodyInfo
        ch.stop_consuming()

Это так просто и удобно :D

person Mike Graham    schedule 09.05.2019

Если вы используете генератор channel.consume в цикле for, вы можете установить параметр inactivity_timeout.

Из документов pika,

:param float inactivity_timeout: if a number is given (in seconds), will cause the
method to yield (None, None, None) after the given period of inactivity; this 
permits for pseudo-regular maintenance activities to be carried out by the user 
while waiting for messages to arrive. If None is given (default), then the method 
blocks until the next event arrives. NOTE that timing granularity is limited by the 
timer resolution of the underlying implementation.NEW in pika 0.10.0.

поэтому изменение кода на что-то вроде этого может помочь

        for method_frame, properties, body in channel.consume(queue, inactivity_timeout=120):

            # break of the loop after 2 min of inactivity (no new item fetched)
            if method_frame is None
                break

Не забудьте правильно обработать канал и соединение после выхода из цикла.

person Joe Haddad    schedule 06.01.2021