У меня есть поток DSL, который отлично работает с каналом очереди. Однако, когда я делаю его синхронным, используя канал Rendezvous, я получаю скорость подтверждения не более 30 сообщений в секунду. Моим обработчикам требуется всего 350 микросекунд, чтобы завершить процесс, но скорость подтверждения остается низкой. Это резко увеличивает очередь кроликов. Я даже увеличил число одновременных потребителей до 10 и увеличил предварительную выборку, но это не помогло. Затем я добавил еще пару масштабируемых инстансов, но это помогло поднять скорость подтверждения примерно до 45/сек.
Как ускорить подтверждение потока? Я ожидаю скорость более 500 в секунду.
Дсл поток:
SimpleMessageListenerContainer simpleMessageListenerContainer = profileTagRabbitMLCConfig.transactedChannelSpanRabbitSMLC(queueName)
simpleMessageListenerContainer?.setConcurrentConsumers(concurrentConsumer)
simpleMessageListenerContainer?.setPrefetchCount(prefetch)
return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer))
.channel(rendezvousTransformerChannel1())
.transform(myTransformer, 'transform', { e -> e.advice(adviceWithRecoverer) })
.channel(rendezvousTransformerChannel2())
.handle(myHandler, 'save', { e -> e.advice(adviceWithRecoverer) })
.get()
Синхронные каналы:
@Bean
MessageChannel rendezvousTransformerChannel1() {
return MessageChannels.rendezvous().get()
}
@Bean
MessageChannel rendezvousHandlerChannel() {
return MessageChannels.rendezvous().get()
}
Контейнер:
SimpleMessageListenerContainer
transactedChannelSpanRabbitSMLC(CachingConnectionFactory rabbitConnectionFactory, String queueName){
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer()
container.setConnectionFactory(rabbitConnectionFactory)
container.setQueueNames(queueName)
container.setChannelTransacted(true)
container
}
Рекомендации по восстановлению для повторной попытки:
Advice getRetryAdviceWithRecovery() {
RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice()
advice.setRetryTemplate(getRetryTemplate())
advice.recoveryCallback = getRecoveryCallback() // sends message to rabbit exchange
advice
}
Опрос:
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata poller() {
return Pollers.fixedDelay(100).maxMessagesPerPoll(500L).get();
}