У меня есть (устаревшая) служба TCP с несколькими процессами. Каждый процесс работает на одном хосте, но на другом порту. Служба является однопоточной, поэтому способ увеличить пропускную способность заключается в циклическом переборе каждого запроса через каждый из портов.
Я предоставляю доступ AMQP к этому унаследованному приложению. Это очень просто - взять строку из очереди AMQP, передать ее приложению и вернуть строку ответа в очередь ответов AMQP.
Это прекрасно работает на одном порту. Тем не менее, я хотел бы распределить запросы по всем портам.
Spring Integration, похоже, предоставляет только AbstractClientConnectionFactory
реализации, которые либо подключаются напрямую к одному хосту/порту (TcpNetClientConnectionFactory
), либо поддерживают пул подключений к одному хосту/порту (CachingClientConnectionFactory
). Между одним хостом и несколькими портами нет соединений такого пула.
Я попытался написать свой собственный AbstractClientConnectionFactory
, который поддерживает пул AbstractClientConnectionFactory
объектов и круговой перебор между ними. Тем не менее, я столкнулся с несколькими проблемами, связанными с обработкой TCP-соединений, когда целевая служба отключается или сеть прерывается, которые я не смог решить.
Существует также подход к этому вопросу: Spring Integration 4 - настройка LoadBalancingStrategy в Java DSL, но решение этой проблемы заключалось в жестком кодировании количества конечных точек. В моем случае количество конечных точек известно только во время выполнения и настраивается пользователем.
Итак, в основном мне нужно создать TcpOutboundGateway
для каждого порта динамически во время выполнения и каким-то образом зарегистрировать его в моем IntegrationFlow
. Я попытался сделать следующее:
@Bean
public IntegrationFlow xmlQueryWorkerIntegrationFlow() {
SimpleMessageListenerContainer inboundQueue = getMessageListenerContainer();
DirectChannel rabbitReplyChannel = MessageChannels.direct().get();
IntegrationFlowBuilder builder = IntegrationFlows
.from(Amqp.inboundGateway(inboundQueue)
.replyChannel(rabbitReplyChannel))
/* SOMEHOW DO THE ROUND ROBIN HERE */
//I have tried:
.channel(handlerChannel()) //doesnt work, the gateways dont get started and the message doesnt get sent to the gateway
//and I have also tried:
.handle(gateway1)
.handle(gateway2) //doesnt work, it chains the handlers instead of round-robining between them
//
.transform(new ObjectToStringTransformer())
.channel(rabbitReplyChannel);
return builder.get();
}
@Bean
//my attempt at dynamically adding handlers to the same channel and load balancing between them
public DirectChannel handlerChannel() {
DirectChannel channel = MessageChannels.direct().loadBalancer(new RoundRobinLoadBalancingStrategy()).get();
for (AbstractClientConnectionFactory factory : generateConnections()) {
channel.subscribe(generateTcpOutboundGateway(factory));
}
return channel;
}
Кто-нибудь знает, как я могу решить эту проблему?