Используя Ruby-клиент RabbitMQ "bunny", я хочу, чтобы мой производитель (некоторый код Ruby) отправил сообщение потребителю (работнику, использующему гем "sneakers"), и я хочу, чтобы мой производитель не выполнял ни одной строки своего кода Ruby до тех пор, пока производитель получает подтверждение того, что потребитель получил мое сообщение и проделал с ним некоторую работу.
В моем потребителе я выполняю некоторую работу, а затем вызываю метод ack!
кроссовок, чтобы подтвердить, что сообщение получено и работа выполнена.
В моем производителе я вызываю confirm_select
на своем экземпляре Bunny::Channel
, чтобы поместить его в код подтверждения, и после publish
передачи моих сообщений я вызываю wait_for_confirms
на канале, чтобы якобы дождаться, пока все мои сообщения не будут ack!
обработаны потребителем. (Я попытался реализовать то, что нашел в документации по кроликам здесь.)
Однако похоже, что мой производитель не ждет, пока потребитель позвонит ack!
. Я вхожу в систему как со своим производителем, так и со своим потребителем и обнаруживаю, что мой производитель, кажется, думает, что сообщения были подтверждены до того, как потребитель действительно их подтвердит.
Как заставить производителя RabbitMQ ждать, пока потребитель закончит свою работу в Ruby?
Руби 2.3.3, RabbitMQ 3.6.12, Эрланг 17.3.
Вот мой файл блокировки:
GEM
specs:
amq-protocol (2.2.0)
bunny (2.7.0)
amq-protocol (>= 2.2.0)
concurrent-ruby (1.0.5)
serverengine (1.5.11)
sigdump (~> 0.2.2)
sigdump (0.2.4)
sneakers (2.6.0)
bunny (~> 2.7.0)
concurrent-ruby (~> 1.0)
serverengine (~> 1.5.11)
thor
thor (0.20.0)
PLATFORMS
ruby
DEPENDENCIES
bunny
sneakers
BUNDLED WITH
1.14.6
Вот мой потребитель/работник (consumer_worker.rb
):
class ConsumerWorker
include Sneakers::Worker
from_queue 'do-work-here',
exchange: 'do-work-here',
exchange_type: :direct,
durable: true,
prefetch: 1,
arguments: {
:'x-dead-letter-exchange' => 'do-work-here-retry'
},
timeout_job_after: 5,
retry_timeout: 60000,
ack: true
def work(msg)
open('ruby-debug.log', 'a') do |f|
f.puts "message received: #{msg}"
end
sleep 1
open('ruby-debug.log', 'a') do |f|
f.puts "acknowledging message at: #{Time.now.to_i}"
end
ack!
open('ruby-debug.log', 'a') do |f|
f.puts "acknowledged message at: #{Time.now.to_i}"
end
end
end
На одной вкладке терминала я запускаю этого работника с помощью:
bundle exec sneakers work ConsumerWorker --require consumer_worker.rb
Вот мой издатель (publisher.rb
):
require 'bunny'
connection = Bunny.new('amqp://guest:guest@localhost:5672').tap(&:start)
channel = connection.create_channel
channel.confirm_select
queue = channel.queue('do-work-here',
{arguments: {:'x-dead-letter-exchange' => 'do-work-here-retry'},
durable: true})
queue.publish('hello world', persistent: true)
channel.wait_for_confirms
open('ruby-debug.log', 'a') do |f|
f.puts "messages confirmed at: #{Time.now.to_i}"
end
Когда я запускаю следующую команду на другой вкладке:
ruby ./publisher.rb
Тогда мой файл журнала (./ruby-debug.log
) содержит следующие строки:
message received: hello world
messages confirmed at: 1505774819
acknowledging message at: 1505774820
acknowledged message at: 1505774820
Я хочу, чтобы порядок событий был таким:
message received
acknowledging message
acknowledged message
messages confirmed
Как мне это осуществить?