Накладывание сообщений на потребителей, когда другие потребители доступны в RabbitMQ - использование bunny for rails

Сообщения очереди в RabbitMQ складываются и ожидают одного потребителя, пока другие потребители доступны.

Мы используем RabbitMQ в качестве нашей службы обмена сообщениями. Мы используем bunny для создания подключений и настройки очередей с прохождением сообщений.

В нашей настройке rails с использованием bunny у нас возникла проблема, когда у нас есть одна очередь с 8 потребителями, которые слушают сообщения в этой очереди. Когда поступают сообщения, в идеале они должны выполняться циклически по примеру потребителей: 4 сообщения в очереди, потребитель 1 принимает сообщение 1, - потребитель 1 занят, потребитель 2 принимает сообщение 2, потребитель 2 занят, потребитель 3 принимает сообщение 3 и т. д.

Однако проблема, с которой мы сталкиваемся, заключается в том, что в очереди 4 сообщения, потребитель 1 забирает сообщение 1, потребитель 1 занят, потребитель 2-8 доступны, но сообщения 2-4 складываются в очередь, ожидая, что потребитель 1 станет доступным, и обрабатывать сообщения.

Мне кажется, что я провел кучу исследований и просто не могу понять, как остановить накопление и ожидание сообщений на одном потребителе.

У кого-нибудь был опыт в этом или есть идеи, как решить эту проблему?

conn = Bunny.new(bunny[0])
conn.start
ch = conn.create_channel
q = ch.queue("#{record_queue_name}", :durable => true)
q.subscribe(:manual_ack => true, :arguments => {"x-priority" => 10}, :block => true) do |delivery_info, properties, payload|
ch.acknowledge(delivery_info.delivery_tag, false)

Мы хотим, чтобы в любое время, когда какие-либо сообщения отправлялись в RabbitMQ, потребитель забирал их в порядке очереди, а не несколько сообщений, складывающихся в стопку, ожидающих занятого потребителя, когда другие доступны.

РЕДАКТИРОВАТЬ: Как воспроизвести: запустить 3 потребителя одновременно. протолкнуть 6 сообщений - потребители 1–3 теперь заняты 3 сообщениями в очереди. перезапустите 2 и 3, когда 2 и 3 снова прослушивают, 3 сообщения все еще ждут в очереди на потребителе 1. Потребители 2 и 3 все еще доступны.

Перезапустите потребителя 1, теперь 3 сообщения в очереди сначала поступают от сервера к вновь перезапущенным потребителям 2 и 3.

Мне нужны сообщения, чтобы первым пришел первый сервер, независимо от того, перезапускаются ли потребители.


person joe    schedule 30.07.2019    source источник
comment
Странно передавать аргумент x- через subscribe. Если вы предоставите код, который надежно воспроизводит это в репозитории, который я могу клонировать и запускать, я могу исследовать.   -  person Luke Bakken    schedule 31.07.2019
comment
У меня нет дополнительного кода, которым я мог бы поделиться публично. - Также я хотел добавить, что если все потребители создаются одновременно и слушают, он делает это в порядке очереди, однако, если какой-либо из потребителей перезапускается, пока сообщения в очереди ожидают, они не будут захватывать ожидающие сообщения после их резервного копирования и прослушивания - это изначально то, что вызывает стек ожидающих сообщений.   -  person joe    schedule 31.07.2019
comment
Вы добавили в свой комментарий очень важную деталь. Измените исходный вопрос и дайте точные инструкции, как это воспроизвести. Если вы не можете поделиться минимальным, работоспособным примером кода, чтобы воспроизвести проблему, очень маловероятно, что я потрачу время, чтобы угадать, как ее решить.   -  person Luke Bakken    schedule 31.07.2019
comment
Оригинал обновлен с учетом того, как я дублирую проблему, логика кода, которую обрабатывает потребитель, позволяет ему оставаться достаточно занятым, чтобы сообщения ожидали в очереди для потребителей.   -  person joe    schedule 31.07.2019


Ответы (1)


RabbitMQ работает по назначению.

Поскольку ваш код не устанавливает QoS / prefetch, RabbitMQ отправляет все шесть < / strong> сообщения вашему первому потребителю. Поскольку этому потребителю требуется время для подтверждения сообщений (имитируется 45-секундным сном в коде), эти шесть остаются в состоянии «Unacked», в то время как вашим двум другим потребителям не над чем работать. Перезапуск этих двух других потребителей не имеет никакого эффекта, поскольку все шесть сообщений находятся в состоянии «Unacked», ожидая подтверждения от первого потребителя.

Когда вы перезапускаете своего первого потребителя, RabbitMQ обнаруживает потерянное соединение и помещает шесть сообщений в состояние «Готово» и доставляет все шесть (наиболее вероятно) другому потребителю, и проблема повторяется.

См. этот пример исполняемого кода, чтобы узнать, как установить предварительную выборку. При предварительной выборке 1 RabbitMQ доставит потребителю не более одного сообщения и будет ждать подтверждения, прежде чем доставить этому потребителю другое сообщение. Таким образом сообщения будут распространяться среди ваших потребителей.


ПРИМЕЧАНИЕ: команда RabbitMQ отслеживает rabbitmq-users список рассылки и лишь иногда отвечает на вопросы по StackOverflow.

person Luke Bakken    schedule 31.07.2019
comment
Спасибо! Это именно то. Не знаю, как я это пропустил. Добавил предварительную выборку, и все работает так, как мне нужно. - person joe; 31.07.2019