Подпишитесь на очередь, получите 1 сообщение, а затем отпишитесь

У меня есть сценарий, в котором мне нужно очень быстро распределять и обрабатывать задания. У меня будет около 45 заданий, быстро заполняемых в очереди, и я могу обрабатывать около 20 одновременно (5 машин, по 4 ядра каждая). Каждое задание занимает разное количество времени, и, чтобы усложнить ситуацию, сбор мусора является проблемой, поэтому мне нужно иметь возможность отключать потребителя для сбора мусора.

В настоящее время у меня все работает с pop (каждый потребитель выскакивает каждые 5 мс). Это кажется нежелательным, потому что это приводит к 600 поп-запросам в секунду к rabbitmq.

Я бы хотел, чтобы была всплывающая команда, которая действовала бы как подписка, но только для одного сообщения. (процесс будет заблокирован, ожидая ввода от соединения rabbitMQ через что-то похожее на Kernel.select)

Я пытался обмануть гем AMQP, чтобы сделать что-то подобное, но это не работает: я не могу отказаться от подписки, пока очередь не станет пустой и потребителю больше не будут отправляться сообщения. Другие способы отписки, боюсь, приведут к потере сообщений.

потреблять_1.rb :

require "amqp"

EventMachine.run do
  puts "connecting..."
  connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts "Received a message: #{payload}."
    queue.unsubscribe { puts "unbound" }
    sleep 3
  end
end

потребитель_много.rb:

require "amqp"

# Imagine the command is something CPU - intensive like image processing.
command = "sleep 0.1"

EventMachine.run do
  puts "connecting..."
  connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  queue.subscribe do |payload|
    puts "Received a message: #{payload}."
  end
end

производитель.rb:

require "amqp"

i = 0
EventMachine.run do
  connection = AMQP.connect(:host => "localhost", :user => "guest", :pass => "guest", :vhost => "/")
  puts "Connected to AMQP broker"

  channel = AMQP::Channel.new(connection)
  queue = channel.queue("tasks", :auto_delete => true)
  exchange = AMQP::Exchange.default(channel)

  EM.add_periodic_timer(1) do
    msg = "Message #{i}"
    i+=1
    puts "~ publishing #{msg}"
  end
end

Я запущу Consumer_many.rb и Producer.rb. Сообщения будут поступать, как и ожидалось.

Когда я запускаю Consumer_1.rb, он получает каждое второе сообщение (как и ожидалось). Но он НИКОГДА не отписывается, потому что никогда не заканчивает обработку всех своих сообщений... и так далее.

Как сделать, чтобы Consumer_1.rb подписался на очередь, получил одно сообщение, а затем вышел из кольца балансировщика нагрузки, чтобы он мог выполнять свою работу, не теряя никаких дополнительных ожидающих заданий, которые могут быть в очереди и будут иначе будет запланирована отправка в процесс?

Тим


person Tim Harper    schedule 15.04.2011    source источник
comment
Написать сервер, который ведет себя таким образом, было бы глупо-легко в ruby... может быть, использование zero-mq для реализации моего собственного брокера является решением?   -  person Tim Harper    schedule 16.04.2011


Ответы (2)


Это простая, но очень плохо документированная функция гема AMQP, вам нужно следующее:

В вашем потребителе:

channel = AMQP::Channel.new(connection, :prefetch => 1)

А затем с вашим блоком подписки выполните:

queue.subscribe(:ack => true) do |queue_header, payload|
  puts "Received a message: #{payload}."
  # Do long running work here

  # Acknowledge message
  queue_header.ack
end

Что это делает, он говорит RabbitMQ отправлять вашему потребителю только одно сообщение за раз и не отправлять другое сообщение, пока вы не вызовете ack в заголовке очереди после работы над длительной задачей в течение некоторого времени.

Возможно, меня нужно будет поправить, но я считаю, что обмен direct лучше подходит для этой задачи.

person Ivan    schedule 16.04.2011
comment
Иван, ты гений. Я искал это в течение нескольких месяцев. СПАСИБО! - person Tim Harper; 16.04.2011
comment
Я не наполовину удивлен, я несколько месяцев бился головой о стол, прежде чем понял это, прочитав исходный код. - person Ivan; 16.04.2011
comment
Иван - Я не думаю, что тип обмена здесь имеет большое значение. Его проблема находится на уровне очереди, потому что похоже, что он хочет, чтобы несколько потребителей работали из одной и той же очереди, каждый по очереди извлекая один рабочий элемент и обрабатывая его. Таким образом, тип используемого обмена не имеет значения, поскольку обмен по существу выпадает из поля зрения, как только сообщение помещается в очередь. - person Brian Kelly; 17.04.2011

С окружающей средой, которая у меня есть,

Версия RabbitMQ: 3.3.3

версия драгоценного камня amqp: 1.5.0

Решение Ивана по-прежнему приводило к тому, что все сообщения извлекались из очереди.

Вместо этого количество неподтвержденных сообщений при подписке на очередь можно ограничить, настроив QoS канала.

Согласно документу API AMQP::Channel,

#qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) ⇒ Object

Одно примечание к этому методу: если вы используете серверы RabbitMQ после версии 2.3.6, prefetch_size устарел.

channel = AMQP::Channel.new(connection, :prefetch => 1)
channel.qos(0, 1)

queue = channel.queue(queue_name, :auto_delete => false)
queue.subscribe(:ack => true) do |metadata, payload|
  puts "Received a message: #{payload}."

  # Do long running work here

  # Acknowledge message
  metadata.ack
end

Надеюсь, решение поможет кому-то.

Ваше здоровье.

person Jack Wu    schedule 27.03.2015