RabbitMQ/Sneakers - ограничить конкретную очередь только одним рабочим за раз?

У меня есть вариант использования RabbitMQ и драгоценного камня Sneakers, где у меня запущено несколько рабочих процессов, отвечающих на несколько десятков очередей в моем проекте. Поэтому очень вероятно, что рабочие процессы могут одновременно обрабатывать сообщения из одной и той же очереди.

В частности, с одной очередью — назовем ее :one_at_a_time — я хочу, чтобы только один рабочий процесс мог обрабатывать сообщение из очереди в любой момент времени.

Причина, по которой я хочу это сделать, заключается в том, что рабочий процесс предназначен для выполнения следующих действий:

  1. Поиск объекта AR по переданному идентификатору
  2. Check that an attribute - let's say :worked - is set.
    1. If true, then ack! the message.
    2. Если false, отправьте электронное письмо пользователю, а затем установите для :worked значение true.

Это разработано таким образом, чтобы я случайно не отправил пользователю электронное письмо дважды, если два сообщения создаются в быстрой последовательности с одним и тем же идентификатором объекта. И этот дизайн будет работать нормально, если только один рабочий процесс когда-либо прослушивал эту очередь в любой момент времени, потому что первый запуск будет проходить через шаги 1 -> 2 -> 2, а следующий запуск будет проходить через шаги 1 -> 2 -> 1 и не будет отправлять пользователю электронное письмо. Но при тестировании я обнаружил, что существует возможность возникновения состояния гонки, когда два работника одновременно извлекают сообщение из очереди :one_at_a_time, проходят проверку на то, что :worked установлен, и оба отправляют электронное письмо.

Имея все это в виду, есть ли способ ограничить количество рабочих, которые слушают очередь? Спасибо.


person Argus9    schedule 09.10.2017    source источник
comment
Вы нашли решение для этого? Столкнулся с такой же проблемой, и было бы интересно узнать, как вы ее решили.   -  person Paul Etscheit    schedule 16.01.2018


Ответы (1)


Для получения дополнительной информации запрос Argus9 можно заархивировать, выполнив следующие действия:

1) Вы можете контролировать выбор вашего работника:

class YourWorker
include Sneakers::Worker
from_queue "your_queue",
           :env => nil,
           :ack => true,
           :workers => 1, #Number of per-cpu processes to run
           :prefetch => 1, #This param will define that single message will be fetched per time
           :threads => 1, #This will define that you have single thread running
           :heartbeat => 2,
           :share_threads => true,
           :timeout_job_after => 3600,
           :exchange => 'your_exchange'

def work(args={})
 #... your steps here
end 
end

2) Вам нужно обратить внимание на то, что вы указали в своем файле crossers.rb в качестве начальных параметров (поскольку он используется Sneakers::Runner при инициализации рабочего процесса), поэтому убедитесь, что там указаны правильные параметры, например:

Sneakers.configure  :amqp => url,
                :daemonize => true,
                :ack => true,
                :prefetch => 1,
                :threads => 1,
                :start_worker_delay => 0.1,  
                :workers => 1,               
                :exchange => "your_exchange",
                :exchange_type => :direct,
                :log => "log/sneakers.log"
Sneakers.logger.level = Logger::DEBUG

Вы также можете создать некоторые дополнительные элементы управления с помощью RabbitMQ API, которые позволят вам проверять такие вещи, как, обрабатывается ли уже какое-то сообщение?... и т. д., что не так просто архивировать с помощью bunny и так далее. Используя очень простой код, например:

    def queue_info
    queues_infos = {}    
    rabbitmqctl_url = "http://127.0.0.1:15672"
    rabbitmqctl_user = "your_user"
    rabbitmqctl_password = "your_password"
    uri = URI.parse("#{rabbitmqctl_url}/api/queues")
    request = Net::HTTP::Get.new(uri)
    request.basic_auth(rabbitmqctl_user, rabbitmqctl_password)
    req_options = { use_ssl: uri.scheme == 'https' }
    response = Net::HTTP.start(uri.hostname, uri.port, req_options)  do |http|
      http.request(request)
    end
    queue_details = JSON.parse(response.body)
    queue_details.each do |queue|
      queues_infos[queue['name'].to_s] = {  name: queue['name'],
                                            msg_total: queue['messages'],
                                            msg_ready: queue['messages_ready'],
                                            msg_unacknowlged: queue['messages_unacknowledged'],
                                            state: queue['state'],
                                            consumers: queue['consumers'] }
    end
    return queues_infos
end
person Mario    schedule 30.10.2018