Интеграция Spring — балансировка нагрузки между несколькими исходящими шлюзами TCP

У меня есть (устаревшая) служба TCP с несколькими процессами. Каждый процесс работает на одном хосте, но на другом порту. Служба является однопоточной, поэтому способ увеличить пропускную способность заключается в циклическом переборе каждого запроса через каждый из портов.

Я предоставляю доступ AMQP к этому унаследованному приложению. Это очень просто - взять строку из очереди AMQP, передать ее приложению и вернуть строку ответа в очередь ответов AMQP.

Это прекрасно работает на одном порту. Тем не менее, я хотел бы распределить запросы по всем портам.

Spring Integration, похоже, предоставляет только AbstractClientConnectionFactory реализации, которые либо подключаются напрямую к одному хосту/порту (TcpNetClientConnectionFactory), либо поддерживают пул подключений к одному хосту/порту (CachingClientConnectionFactory). Между одним хостом и несколькими портами нет соединений такого пула.

Я попытался написать свой собственный AbstractClientConnectionFactory, который поддерживает пул AbstractClientConnectionFactory объектов и круговой перебор между ними. Тем не менее, я столкнулся с несколькими проблемами, связанными с обработкой TCP-соединений, когда целевая служба отключается или сеть прерывается, которые я не смог решить.

Существует также подход к этому вопросу: Spring Integration 4 - настройка LoadBalancingStrategy в Java DSL, но решение этой проблемы заключалось в жестком кодировании количества конечных точек. В моем случае количество конечных точек известно только во время выполнения и настраивается пользователем.

Итак, в основном мне нужно создать TcpOutboundGateway для каждого порта динамически во время выполнения и каким-то образом зарегистрировать его в моем IntegrationFlow. Я попытался сделать следующее:

@Bean
public IntegrationFlow xmlQueryWorkerIntegrationFlow() {
    SimpleMessageListenerContainer inboundQueue = getMessageListenerContainer();

    DirectChannel rabbitReplyChannel = MessageChannels.direct().get();

    IntegrationFlowBuilder builder = IntegrationFlows
            .from(Amqp.inboundGateway(inboundQueue)
                      .replyChannel(rabbitReplyChannel))    
            /* SOMEHOW DO THE ROUND ROBIN HERE */
            //I have tried:
            .channel(handlerChannel()) //doesnt work, the gateways dont get started and the message doesnt get sent to the gateway
            //and I have also tried:
            .handle(gateway1)
            .handle(gateway2) //doesnt work, it chains the handlers instead of round-robining between them
            //
            .transform(new ObjectToStringTransformer())
            .channel(rabbitReplyChannel); 

    return builder.get();

}

@Bean
//my attempt at dynamically adding handlers to the same channel and load balancing between them
public DirectChannel handlerChannel() {
    DirectChannel channel = MessageChannels.direct().loadBalancer(new RoundRobinLoadBalancingStrategy()).get();
    for (AbstractClientConnectionFactory factory : generateConnections()) {
        channel.subscribe(generateTcpOutboundGateway(factory));
    }
    return channel;
}

Кто-нибудь знает, как я могу решить эту проблему?


person Erin Drummond    schedule 22.05.2016    source источник


Ответы (1)


См. пример динамического ftp - по сути, каждый исходящий шлюз идет в своем собственном контексте приложения, а динамический маршрутизатор направляет маршрут к соответствующему каналу (для которого при необходимости создается исходящий адаптер).

Хотя в примере используется XML, вы можете сделать то же самое с конфигурацией Java или даже с Java DSL.

См. мой ответ на похожий вопрос для нескольких почтовых адаптеров IMAP с использованием конфигурации Java, а затем дополнительный вопрос.

person Gary Russell    schedule 23.05.2016