Оптимизация использования сообщений от rabbitmq с помощью Spring Integration

Я пытаюсь создать IntegrationFlowFactory, чтобы легко создавать потоки интеграции для передачи событий между контекстами приложения.

Кажется, все работает, и события публикуются очень быстро.

Однако я не могу понять, почему потребление так медленно. Добавление concurrentConsumers или изменение prefetchCount ничего не меняет.

В других сообщениях говорится о медленной работе сети, но, как вы можете видеть в RabbitConfig, я использую localhost.

У меня есть репозиторий с моим примером интеграции Spring здесь: https://github.com/teplyuska/spring-integration-example


person heuts    schedule 02.11.2017    source источник
comment
Если кажется, что увеличение предварительной выборки ничего не меняет, вероятно, это проблема в коде прослушивателя; пора профилировать ваше приложение. Переход от 10 к 100 может не иметь значения, но переход от 1 к 10, безусловно, должен (если слушатель легковесен).   -  person Gary Russell    schedule 02.11.2017
comment
Скачаю ваш образец локально, чтобы немного поиграть...   -  person Artem Bilan    schedule 02.11.2017
comment
Добавление большего количества concurrentConsumers повысит общую пропускную способность, но она по-прежнему будет очень низкой на одного потребителя.   -  person heuts    schedule 02.11.2017
comment
Смотрите мой ответ для решения.   -  person Artem Bilan    schedule 02.11.2017


Ответы (1)


Ваша проблема здесь:

Amqp.inboundGateway(getListenerContainer(queue, concurrentConsumers, prefetchCount)

Тем временем ваш нисходящий поток является односторонним и не возвращает никакого ответа:

.handle(p -> {
                UpdateSecretEvent payload = (UpdateSecretEvent) p.getPayload();
                System.out.println("Account: " + payload.getAccountId() + " has secret: " + payload.getNewSecret());
 })
.get();

or

.handle(p -> {
                UpdateEmailEvent payload = (UpdateEmailEvent) p.getPayload();
                System.out.println("Account: " + payload.getAccountId() + " has email: " + payload.getEmail());
})
.get();

Итак, этот AmqpInboundGateway ждет ответа в своем MessagingTemplate.sendAndReceive() для private static final long DEFAULT_TIMEOUT = 1000L;

Переключение на Amqp.inboundAdapter() помогает.

person Artem Bilan    schedule 02.11.2017