Поместите неподтвержденное сообщение (в случае исключения) в другую очередь в RabbitMQ

В качестве исключения при обработке сообщения из RabbitMQ я просто хотел отменить подтверждение и вместо этого поместить конкретное сообщение в другую очередь или повторно поставить в очередь в ту же очередь или полностью отбросить сообщение (согласно последнему логическому флагу @requeue в basicNack).

Вся идея заключается в том, что позже я могу получить количество неподтвержденных сообщений и проверить формат сообщения и т. Д. Вместо того, чтобы снова и снова ставить в очередь один и тот же канал, а также я хочу отправить неподтвержденный сигнал на текущий канал.

К вашему сведению, я установил режим подтверждения канала как ручной (т.е. container.setAcknowledgeMode(AcknowledgeMode.MANUAL);)

Это то, что я делаю сейчас.

public class My***Listener implements ChannelAwareMessageListener{

try{

    @Override
    public void onMessage(Message message,Channel channel) throws Exception {   
    String s = new String(message.getBody());
    //some logic
    //after successful ack manually
    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
    }
catch(Exception e){
      //currently on exception i am unack the channel
      channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
}

Любая помощь очень ценна.


person lambodar    schedule 18.08.2014    source источник
comment
В качестве комментария к этому вопросу - то, что вы действительно спрашиваете, - это лучший способ получить количество сообщений о распаковке и проверить формат сообщения, поэтому я бы начал с этого и конкретно уточнил, какие методы вы пробовали.   -  person theMayer    schedule 18.08.2014


Ответы (3)


Вы можете отправить их в очередь недоставленных сообщений. Это довольно стандартный шаблон.

https://www.rabbitmq.com/dlx.html

person Enno Shioji    schedule 18.08.2014
comment
Спасибо, Энно, в настоящее время я использую spring-amqp, поэтому я не выполняю настройку обмена или замедление очереди (кроме установки имени очереди в контейнер) со стороны кодирования. Есть ли аналогичный способ в spring-amqp? - person lambodar; 18.08.2014
comment
@Lambodar: вы можете настроить свою политику непосредственно в брокере, а не в своем коде, использующем spring-amqp (подробности см. В ссылке). IMO, это предпочтительнее, чем настраивать его в вашем коде. - person Enno Shioji; 18.08.2014
comment
@EnnoShioji Скажите, пожалуйста, каждое отклоненное сообщение отправляется на DLX? Независимо от типа исключения? - person ; 25.03.2017

Вам нужно что-то вроде этого:

@Bean
RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .withMaxAttempts(5)
            .setRecoverer(new RepublishMessageRecoverer(amqpTemplate(), "bar", "baz"))
            .build();
}

Примечание: работает только с spring-amqp 1.3+, см. также Ссылка

person Jaiwo99    schedule 18.08.2014

Если вы предпочитаете использовать объявления, см. этот ответ для примера.

Чтобы направлять сообщения в DLX, вы можете установить defaultRequeuRejected в false (в контейнере прослушивателя). Или вы можете бросить AmqpRejectAndDontRequeueException, чтобы сообщить контейнеру, что вы хотите, чтобы это сообщение было отклонено (и не помещено в очередь).

Поведение контейнера по умолчанию — повторная отправка отклоненных сообщений в очередь.

Вы можете использовать перехватчик повторных попыток с RejectAndDontRequeueRecoverer для автоматического создания исключения; или, как говорит @ Jawo99, вы можете использовать средство восстановления повторной публикации - это дает дополнительное преимущество, заключающееся в добавлении трассировки стека в качестве заголовка. DLX просто направляет исходное сообщение.

person Gary Russell    schedule 18.08.2014