Предположим, у меня есть очередь с пятью элементами:
(tail) E, D, C, B, A (head)
Я потребляю сообщения из головы этой очереди, но решаю, что сообщение A
не подходит для обработки в настоящее время. Я reject
этот элемент с requeue=True
, и очередь становится:
(tail) A, E, D, C, B (head)
Затем я потребляю B
, C
, D
и E
, ack
я каждое из них. Теперь очередь содержит только A
, которые я постоянно потребляю и reject
снова и снова в бесконечном цикле. Если приходит новое сообщение, отличное от A
, оно потребляется почти сразу, после чего процесс возобновляет цикл попыток потреблять A
.
Я делаю это с небольшой модификацией примера Twisted Consumer из документации Pika:
import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol,task
@defer.inlineCallbacks
def run(connection):
channel = yield connection.channel()
exchange = yield channel.exchange_declare(exchange='topic_link',type='topic')
queue = yield channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)
yield channel.queue_bind(exchange='topic_link',queue='hello',routing_key='hello.world')
#yield channel.basic_qos(prefetch_count=1)
queue_object, consumer_tag = yield channel.basic_consume(queue='hello',no_ack=False)
l = task.LoopingCall(read, queue_object)
l.start(0.01)
@defer.inlineCallbacks
def read(queue_object):
ch,method,properties,body = yield queue_object.get()
print body
if body == 'A':
yield ch.basic_reject(delivery_tag=method.delivery_tag, requeue=True)
else:
yield ch.basic_ack(delivery_tag=method.delivery_tag)
parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('hostname', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()
Проблема. Обратите внимание на следующую закомментированную строку:
#yield channel.basic_qos(prefetch_count=1)
Когда я раскомментирую это, и потребитель достигнет сообщения A
, он сразу же подберет его снова после reject
ing, игнорируя любые другие элементы, которые могут ожидать в очереди за ним. Вместо того, чтобы помещать отклоненный элемент в хвост очереди, он просто продолжает пробовать его снова, снова и снова, полностью блокируя все остальное в очереди.
С закомментированной строкой он работает правильно (хотя и немного медленнее). Если в строке присутствует и prefetch_count > 1
, то тоже работает. Что-то в установке ровно 1
вызывает такое поведение.
Есть ли шаг, который я пропустил при отклонении сообщения A
? Или система предварительной выборки Pika принципиально несовместима с этим крайним случаем?