У меня есть интеграция spring IntegrationFlow
, которая определяется следующим образом:
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean)
.get();
А serviceActivatorBean
выглядит так:
@Component
@Transactional
public class ServiceActivator {
@ServiceActivator
public void myMethod(Collection<MyEvent> events) {
....
}
}
Если myMethod
выдает исключение, оно будет зарегистрировано, но повторной попытки не произойдет. Я попытался изменить IntegrationFlow
на это:
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice();
RetryTemplate retryTemplate = new RetryTemplate();
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(5);
retryTemplate.setRetryPolicy(retryPolicy);
advice.setRetryTemplate(retryTemplate);
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("id")
.autoStartup(autoStartup)
.adviceChain(advice)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> ...)
.handle(serviceActivatorBean)
.get();
Но затем я получаю сообщение журнала, подобное этому (повторных попыток не произойдет):
2017-06-30 13:18:10.611 WARN 88706 --- [erContainer#1-2] o.s.i.h.a.RequestHandlerRetryAdvice : этот совет org.springframework.integration.handler.advice.RequestHandlerRetryAdvice можно использовать только для обработчиков сообщений; попытка посоветовать метод 'invokeListener' в 'org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1' игнорируется
Как я могу настроить этот IntegrationFlow
, чтобы он вел себя так же, как RabbitListener
? т.е. пусть RabbitMQ снова опубликует сообщения.