Как мы можем настроить отложенную организацию очереди с помощью Bunny gem и RabbitMQ?

Я настроил RabbitMQ и могу мгновенно публиковать и использовать сообщения с помощью bunny gem из приложения rails. Как мы можем добиться отложенной постановки в очередь с заранее определенным временем задержки для каждого сообщения при публикации на бирже RabbitMQ?


person Ashik Salman    schedule 01.02.2017    source источник
comment
Мне интересно. Почему вы хотите отложить сообщение?   -  person dnsh    schedule 01.02.2017
comment
Я выполняю некоторые фоновые задания здесь. И некоторые работы должны быть выполнены после определенной временной задержки.   -  person Ashik Salman    schedule 01.02.2017


Ответы (1)


Использование подключаемого модуля обмена отложенными сообщениями RabbitMQ: для отложенных сообщений доступен подключаемый модуль, Плагин обмена отложенными сообщениями RabbitMQ. Этот подключаемый модуль добавляет новый тип обмена в RabbitMQ, и сообщения, направляемые на этот обмен, могут быть задержаны на указанное время.

Объединить TTL и DLX для задержки доставки сообщения. Другой вариант — объединить TTL и DLX для задержки доставки сообщения. Объединив их с функциями, мы публикуем сообщение в очереди, срок действия сообщения которой истекает после TTL, а затем перенаправляем его на биржу и с ключом маршрутизации недоставленных сообщений, чтобы они попадали в очередь, из которой мы получаем.

require 'bunny'

B = Bunny.new ENV['CONNECTION_URL']
B.start

DELAYED_QUEUE='work.later'
DESTINATION_QUEUE='work.now'

def publish
  ch = B.create_channel
  # declare a queue with the DELAYED_QUEUE name
  ch.queue(DELAYED_QUEUE, arguments: {
    # set the dead-letter exchange to the default queue
    'x-dead-letter-exchange' => '',
    # when the message expires, set change the routing key into the destination queue name
    'x-dead-letter-routing-key' => DESTINATION_QUEUE,
    # the time in milliseconds to keep the message in the queue
    'x-message-ttl' => 3000
  })
  # publish to the default exchange with the the delayed queue name as routing key,
  # so that the message ends up in the newly declared delayed queue
  ch.default_exchange.publish 'message content', routing_key: DELAYED_QUEUE
  puts "#{Time.now}: Published the message"
  ch.close
end

def subscribe
  ch = B.create_channel
  # declare the destination queue
  q = ch.queue DESTINATION_QUEUE, durable: true 
  q.subscribe do |delivery, headers, body|
    puts "#{Time.now}: Got the message"
  end
end

subscribe()
publish()

sleep

Как описано здесь: https://www.cloudamqp.com/docs/delayed-messages.html

person Lovisa Johansson    schedule 01.02.2017