Подождите, пока производитель сообщения не получит подтверждение о завершении работы потребителем.

Используя Ruby-клиент RabbitMQ "bunny", я хочу, чтобы мой производитель (некоторый код Ruby) отправил сообщение потребителю (работнику, использующему гем "sneakers"), и я хочу, чтобы мой производитель не выполнял ни одной строки своего кода Ruby до тех пор, пока производитель получает подтверждение того, что потребитель получил мое сообщение и проделал с ним некоторую работу.

В моем потребителе я выполняю некоторую работу, а затем вызываю метод ack! кроссовок, чтобы подтвердить, что сообщение получено и работа выполнена.

В моем производителе я вызываю confirm_select на своем экземпляре Bunny::Channel, чтобы поместить его в код подтверждения, и после publish передачи моих сообщений я вызываю wait_for_confirms на канале, чтобы якобы дождаться, пока все мои сообщения не будут ack! обработаны потребителем. (Я попытался реализовать то, что нашел в документации по кроликам здесь.)

Однако похоже, что мой производитель не ждет, пока потребитель позвонит ack!. Я вхожу в систему как со своим производителем, так и со своим потребителем и обнаруживаю, что мой производитель, кажется, думает, что сообщения были подтверждены до того, как потребитель действительно их подтвердит.

Как заставить производителя RabbitMQ ждать, пока потребитель закончит свою работу в Ruby?

Руби 2.3.3, RabbitMQ 3.6.12, Эрланг 17.3.

Вот мой файл блокировки:

GEM
  specs:
    amq-protocol (2.2.0)
    bunny (2.7.0)
      amq-protocol (>= 2.2.0)
    concurrent-ruby (1.0.5)
    serverengine (1.5.11)
      sigdump (~> 0.2.2)
    sigdump (0.2.4)
    sneakers (2.6.0)
      bunny (~> 2.7.0)
      concurrent-ruby (~> 1.0)
      serverengine (~> 1.5.11)
      thor
    thor (0.20.0)

PLATFORMS
  ruby

DEPENDENCIES
  bunny
  sneakers

BUNDLED WITH
   1.14.6

Вот мой потребитель/работник (consumer_worker.rb):

class ConsumerWorker
  include Sneakers::Worker

  from_queue 'do-work-here',
             exchange: 'do-work-here',
             exchange_type: :direct,
             durable: true,
             prefetch: 1,
             arguments: {
               :'x-dead-letter-exchange' => 'do-work-here-retry'
             },
             timeout_job_after: 5,
             retry_timeout: 60000,
             ack: true

  def work(msg)
    open('ruby-debug.log', 'a') do |f|
      f.puts "message received: #{msg}"
    end
    sleep 1
    open('ruby-debug.log', 'a') do |f|
      f.puts "acknowledging message at: #{Time.now.to_i}"
    end
    ack!
    open('ruby-debug.log', 'a') do |f|
      f.puts "acknowledged message at: #{Time.now.to_i}"
    end
  end
end

На одной вкладке терминала я запускаю этого работника с помощью:

bundle exec sneakers work ConsumerWorker --require consumer_worker.rb

Вот мой издатель (publisher.rb):

require 'bunny'
connection = Bunny.new('amqp://guest:guest@localhost:5672').tap(&:start)
channel = connection.create_channel
channel.confirm_select
queue = channel.queue('do-work-here',
                      {arguments: {:'x-dead-letter-exchange' => 'do-work-here-retry'},
                       durable: true})
queue.publish('hello world', persistent: true)
channel.wait_for_confirms
open('ruby-debug.log', 'a') do |f|
  f.puts "messages confirmed at: #{Time.now.to_i}"
end

Когда я запускаю следующую команду на другой вкладке:

ruby ./publisher.rb

Тогда мой файл журнала (./ruby-debug.log) содержит следующие строки:

message received: hello world
messages confirmed at: 1505774819
acknowledging message at: 1505774820
acknowledged message at: 1505774820

Я хочу, чтобы порядок событий был таким:

message received
acknowledging message
acknowledged message
messages confirmed

Как мне это осуществить?


person Jackson    schedule 18.09.2017    source источник
comment
Я думаю, что мое понимание того, как должно работать подтверждение издателя, неверно. См. lists.rabbitmq.com/pipermail/rabbitmq-discuss/2014. -Январь/. Этот учебник может быть полезен для решения моей проблемы: rabbitmq.com/tutorials /tutorial-six-ruby.html   -  person Jackson    schedule 19.09.2017


Ответы (1)


Издатель подтверждает только связь между издателем и RabbitMQ. Издатели не знают о потребителях.

См. пример в руководстве 6, если шаблон запроса/ответа: https://www.rabbitmq.com/getstarted.html.

ConditionVariable — это обычно используемый примитив параллелизма, используемый для задержки дальнейших действий до тех пор, пока не произойдет событие, с необязательным тайм-аутом.

Подтверждения издателя и подтверждения потребителя задокументированы на странице http://www.rabbitmq.com/confirms.htm.

person Michael Klishin    schedule 18.09.2017
comment
Любая идея, как выполнить RPC с кроссовками? - person Jackson; 19.09.2017