Отклонение и повторная постановка задачи RabbitMQ в очередь, когда prefetch_count == 1

Предположим, у меня есть очередь с пятью элементами:

(tail) E, D, C, B, A (head)

Я потребляю сообщения из головы этой очереди, но решаю, что сообщение A не подходит для обработки в настоящее время. Я reject этот элемент с requeue=True, и очередь становится:

(tail) A, E, D, C, B (head)

Затем я потребляю B, C, D и E, ackя каждое из них. Теперь очередь содержит только A, которые я постоянно потребляю и reject снова и снова в бесконечном цикле. Если приходит новое сообщение, отличное от A, оно потребляется почти сразу, после чего процесс возобновляет цикл попыток потреблять A.

Я делаю это с небольшой модификацией примера Twisted Consumer из документации Pika:

import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task


@defer.inlineCallbacks
def run(connection):

    channel = yield connection.channel()

    exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')

    queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)

    yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')

    #yield channel.basic_qos(prefetch_count=1)

    queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)

    l = task.LoopingCall(read, queue_object)

    l.start(0.01)


@defer.inlineCallbacks
def read(queue_object):

    ch,method,properties,body = yield queue_object.get()

    print body

    if body == 'A':
        yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
    else:
        yield ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()

Проблема. Обратите внимание на следующую закомментированную строку:

#yield channel.basic_qos(prefetch_count=1)

Когда я раскомментирую это, и потребитель достигнет сообщения A, он сразу же подберет его снова после rejecting, игнорируя любые другие элементы, которые могут ожидать в очереди за ним. Вместо того, чтобы помещать отклоненный элемент в хвост очереди, он просто продолжает пробовать его снова, снова и снова, полностью блокируя все остальное в очереди.

С закомментированной строкой он работает правильно (хотя и немного медленнее). Если в строке присутствует и prefetch_count > 1, то тоже работает. Что-то в установке ровно 1 вызывает такое поведение.

Есть ли шаг, который я пропустил при отклонении сообщения A? Или система предварительной выборки Pika принципиально несовместима с этим крайним случаем?


person smitelli    schedule 20.06.2014    source источник


Ответы (1)


Если у вас есть только один потребитель, у RabbitMQ нет другого пути, кроме как отправить сообщение тому же потребителю, от которого оно было отклонено (независимо от того, как: с помощью basic.reject или basic.nack).

Когда вы установите prefetch_count > 1, ваш потребитель будет иметь ваше зацикленное сообщение плюс новое из головы рядом с зацикленным (буквально, ваше зацикленное сообщение останется в заголовке).

Если вы случайно получите N*M зацикленные сообщения с prefetch_count <= N и номером получателя ‹= M, все сообщения будут зациклены (что приведет к перегрузке ЦП и т. д.), поэтому неплохо было бы проверить флаг сообщения rejected и иметь некоторую продвинутую логику. если сообщение уже было повторно доставлено.

person pinepain    schedule 20.06.2014
comment
Предположим, я не возражал бы тратить процессорное время на многократное повторение одних и тех же сообщений. Есть ли способ полностью отклонить это сообщение из очереди предварительной выборки, чтобы любые действительные сообщения, стоящие за ним, могли пройти до того, как цикл вернется? - person smitelli; 25.06.2014
comment
Вы можете полностью отбросить недопустимое сообщение и, возможно, поместить его в другую очередь с неверными буквами для устранения это позже вручную или каким-либо другим способом, но это зависит от того, насколько активен ваш поток сообщений. - person pinepain; 25.06.2014