У меня есть сценарий, в котором мне нужно очень быстро распределять и обрабатывать задания. У меня будет около 45 заданий, быстро заполняемых в очереди, и я могу обрабатывать около 20 одновременно (5 машин, по 4 ядра каждая). Каждое задание занимает разное количество времени, и, чтобы усложнить ситуацию, сбор мусора является проблемой, поэтому мне нужно иметь возможность отключать потребителя для сбора мусора.
В настоящее время у меня все работает с pop (каждый потребитель выскакивает каждые 5 мс). Это кажется нежелательным, потому что это приводит к 600 поп-запросам в секунду к rabbitmq.
Я бы хотел, чтобы была всплывающая команда, которая действовала бы как подписка, но только для одного сообщения. (процесс будет заблокирован, ожидая ввода от соединения rabbitMQ через что-то похожее на Kernel.select)
Я пытался обмануть гем AMQP, чтобы сделать что-то подобное, но это не работает: я не могу отказаться от подписки, пока очередь не станет пустой и потребителю больше не будут отправляться сообщения. Другие способы отписки, боюсь, приведут к потере сообщений.
потреблять_1.rb :
require "amqp"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
queue.unsubscribe { puts "unbound" }
sleep 3
end
end
потребитель_много.rb:
require "amqp"
# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"
EventMachine.run do
puts "connecting..."
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
queue.subscribe do |payload|
puts "Received a message: #{payload}."
end
end
производитель.rb:
require "amqp"
i = 0
EventMachine.run do
connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
puts "Connected to AMQP broker"
channel = AMQP::Channel.new(connection)
queue = channel.queue("tasks", :auto_delete => true)
exchange = AMQP::Exchange.default(channel)
EM.add_periodic_timer(1) do
msg = "Message #{i}"
i+=1
puts "~ publishing #{msg}"
end
end
Я запущу Consumer_many.rb и Producer.rb. Сообщения будут поступать, как и ожидалось.
Когда я запускаю Consumer_1.rb, он получает каждое второе сообщение (как и ожидалось). Но он НИКОГДА не отписывается, потому что никогда не заканчивает обработку всех своих сообщений... и так далее.
Как сделать, чтобы Consumer_1.rb подписался на очередь, получил одно сообщение, а затем вышел из кольца балансировщика нагрузки, чтобы он мог выполнять свою работу, не теряя никаких дополнительных ожидающих заданий, которые могут быть в очереди и будут иначе будет запланирована отправка в процесс?
Тим