Повторите попытку весенней интеграции IntegrationFlow при исключении

У меня есть интеграция spring IntegrationFlow, которая определяется следующим образом:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean)
                    .get();

А serviceActivatorBean выглядит так:

@Component
@Transactional
public class ServiceActivator {

    @ServiceActivator

    public void myMethod(Collection<MyEvent> events) {
        ....
    }    
}

Если myMethod выдает исключение, оно будет зарегистрировано, но повторной попытки не произойдет. Я попытался изменить IntegrationFlow на это:

RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
                    .id("id")
                    .autoStartup(autoStartup)
                    .adviceChain(advice)
                    .concurrentConsumers(2)
                    .maxConcurrentConsumers(3)
                    .messageConverter(messageConverter()))
                    .aggregate(a -> ...)
                    .handle(serviceActivatorBean)
                    .get();

Но затем я получаю сообщение журнала, подобное этому (повторных попыток не произойдет):

2017-06-30 13:18:10.611 WARN 88706 --- [erContainer#1-2] o.s.i.h.a.RequestHandlerRetryAdvice : этот совет org.springframework.integration.handler.advice.RequestHandlerRetryAdvice можно использовать только для обработчиков сообщений; попытка посоветовать метод 'invokeListener' в 'org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1' игнорируется

Как я могу настроить этот IntegrationFlow, чтобы он вел себя так же, как RabbitListener? т.е. пусть RabbitMQ снова опубликует сообщения.


person Johan    schedule 30.06.2017    source источник


Ответы (1)


Используйте перехватчик повторных попыток в цепочке рекомендаций адаптера. вместо RequestHandlerRetryAdvice - это для использования конечных точек, как говорится в сообщении.

person Gary Russell    schedule 30.06.2017