Медленное подтверждение канала Spring Integration Rendezous

У меня есть поток DSL, который отлично работает с каналом очереди. Однако, когда я делаю его синхронным, используя канал Rendezvous, я получаю скорость подтверждения не более 30 сообщений в секунду. Моим обработчикам требуется всего 350 микросекунд, чтобы завершить процесс, но скорость подтверждения остается низкой. Это резко увеличивает очередь кроликов. Я даже увеличил число одновременных потребителей до 10 и увеличил предварительную выборку, но это не помогло. Затем я добавил еще пару масштабируемых инстансов, но это помогло поднять скорость подтверждения примерно до 45/сек.

Как ускорить подтверждение потока? Я ожидаю скорость более 500 в секунду.

Дсл поток:

SimpleMessageListenerContainer simpleMessageListenerContainer = profileTagRabbitMLCConfig.transactedChannelSpanRabbitSMLC(queueName)

simpleMessageListenerContainer?.setConcurrentConsumers(concurrentConsumer)
            simpleMessageListenerContainer?.setPrefetchCount(prefetch)

            return IntegrationFlows.from(Amqp.inboundAdapter(simpleMessageListenerContainer))
                    .channel(rendezvousTransformerChannel1())
                    .transform(myTransformer, 'transform', { e -> e.advice(adviceWithRecoverer) })
                    .channel(rendezvousTransformerChannel2())
                    .handle(myHandler, 'save', { e -> e.advice(adviceWithRecoverer) })
                    .get()

Синхронные каналы:

@Bean
MessageChannel rendezvousTransformerChannel1() {
    return MessageChannels.rendezvous().get()
}

@Bean
MessageChannel rendezvousHandlerChannel() {
    return MessageChannels.rendezvous().get()
}

Контейнер:

SimpleMessageListenerContainer 
transactedChannelSpanRabbitSMLC(CachingConnectionFactory rabbitConnectionFactory, String queueName){

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer()
    container.setConnectionFactory(rabbitConnectionFactory)
    container.setQueueNames(queueName)
    container.setChannelTransacted(true)
    container
}

Рекомендации по восстановлению для повторной попытки:

Advice getRetryAdviceWithRecovery() {
    RequestHandlerRetryAdvice advice = new RequestHandlerRetryAdvice()
    advice.setRetryTemplate(getRetryTemplate())
    advice.recoveryCallback = getRecoveryCallback() // sends message to rabbit exchange
    advice
}

Опрос:

@Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata poller() {
        return Pollers.fixedDelay(100).maxMessagesPerPoll(500L).get();
    }

person Pradeepkumar Patil    schedule 04.03.2018    source источник


Ответы (1)


Какой вариант использования побуждает вас использовать RendezvousChannels ?

Это довольно редко, и я не думаю, что когда-либо видел 2 в одном потоке.

У вас должен быть опрос для этого типа канала; Я их не вижу, поэтому это означает, что у вас есть компонент опроса по умолчанию.

Нужно показать свой поллер, но я подозреваю, что он плохо настроен для этого. RendezvousChannel send() блокирует, пока что-то не сделает receive().

Несмотря на это, вы рискуете потерять сообщение, если используете какой-либо тип передачи потоков (QueueChannel, RendezvousChannel) в потоке контейнера прослушивателя.

Вероятно, вам следует просто удалить те .channel() из вашего потока, которые вместо этого будут использовать DirectChannel.

Если вам нужен параллелизм, используйте свойство concurrentConsumers в контейнере прослушивателя.

container.setChannelTransacted(true)

Если вы публикуете сообщения в myHandler, транзакции также обходятся очень дорого.

person Gary Russell    schedule 04.03.2018
comment
Спасибо Гэри. Просто переместил код в исходное описание. Мой вариант использования - ждать подтверждения AUTO, пока обработчик полностью не обработает запросы. Когда я использую канал очереди, сообщение подтверждается немедленно, даже если преобразователь/обработчик не закончил обработку сообщения. По этой причине я даже полагался на InboundGateway. - person Pradeepkumar Patil; 04.03.2018
comment
Здесь вообще не следует использовать QueueChannel или RendezvousChannel; вы хотите, чтобы обработчик выполнялся в потоке контейнера; полностью удалите .channel(), чтобы подтверждение AUTO не произошло до тех пор, пока не вернется myHandler.save(). .from().transform().handle(). Входящий шлюз здесь также не подходит. - person Gary Russell; 04.03.2018
comment
Попробую предложенный подход. Спасибо, Гэри, за быструю помощь сегодня. - person Pradeepkumar Patil; 04.03.2018
comment
Use Case+: причина, по которой нам нужен опрашиваемый канал, состоит в том, что немногие конечные точки REST, с которыми наш обработчик попадает в линию, не могут справиться с большой скоростью. Поэтому мы настроили опросчик по умолчанию, который будет регулировать каждые 100 миллисекунд и каждый раз опрашивать только 500 сообщений. Когда мы использовали канал очереди, мы настроили 1000 как глубину очереди. В очереди Rabbit 1 миллион сообщений. Уходя от опрашиваемых каналов к подписываемым, таким как прямой канал (по умолчанию), могу ли я по-прежнему добиваться регулирования сообщений, а также транзакционного АВТОМАТИЧЕСКОГО подтверждения до тех пор, пока handler.save() не вернется одновременно? - person Pradeepkumar Patil; 04.03.2018
comment
Это неправильная архитектура, поскольку сообщение подтверждается, как только опрашивающий получает сообщение из первого RendezvousChannel. Если вы хотите контролировать скорость сообщений, рассмотрите возможность использования нового Polled Адаптер входящего канала и MessageSourcePollingTemplate , где вы можете контролировать скорость потребления сообщений. - person Gary Russell; 04.03.2018