Динамическое создание клиента TCP с входящим каналом и каналом ответа

Я новичок в интеграции Spring.

Работа с Spring 4, только аннотации Java.

В проекте, в котором я сейчас работаю, мы настроили TCP-соединение в файле свойств.

На данный момент он жестко запрограммирован только на 2 разных соединения, и его необходимо изменить на более динамичный подход, при котором мы можем установить переменное их количество в файле свойств и иметь возможность добавлять новые во время выполнения.

Мне известно о существовании пример динамического клиента tcp и попытался построить на нем свою работу.

Сначала мы настраиваем следующий bean-компонент для соединения:

@Bean(name = "node1TCPConnection")
public AbstractClientConnectionFactory node1TCPConnection() {
  final TcpNetClientConnectionFactory tcpNetClientConnectionFactory = new TcpNetClientConnectionFactory(
  env.getProperty("socket.tcp.nodes[0].ip"), 
  env.getProperty("socket.tcp.nodes[0].port", Integer.class)
  );

  tcpNetClientConnectionFactory.setSingleUse(false);
  tcpNetClientConnectionFactory.setSoKeepAlive(true);

  final ByteArrayLengthHeaderSerializer by = new ByteArrayLengthHeaderSerializer(headBytes);

  tcpNetClientConnectionFactory.setSerializer(by);
  tcpNetClientConnectionFactory.setDeserializer(by);
  return tcpNetClientConnectionFactory;
}

Затем у нас есть адаптер, который ждет отправки чего-либо:

@Bean
public TcpReceivingChannelAdapter node1TcpReaderClient(
        @Qualifier("node1TCPConnection") final AbstractClientConnectionFactory connectionFactory) {
    final TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
    adapter.setConnectionFactory(connectionFactory);
    adapter.setClientMode(true);
    adapter.setErrorChannelName("errorChannel");
    adapter.setRetryInterval(retryInterval);
    adapter.setOutputChannel(fromTcp());
    return adapter;
}

При вызове fromTcp() сообщение преобразуется, и следующий код отправляет его другому приложению для дальнейшей обработки.

@ServiceActivator(inputChannel = "fromTcp")
public void outbound(final String inMessage, final @Headers Map<String, Object> headerMap) {
    sendToApi(inMessage, headerMap);
}

Когда сообщение обработано, мы должны отправить ответ.

@Bean
@ServiceActivator(inputChannel = "toTcpCh01")
public TcpSendingMessageHandler tcpOutGateCh01(
        final @Qualifier("node1TCPConnection") AbstractClientConnectionFactory connectionFactory) {
    final TcpSendingMessageHandler tcpSendingMsgHandler = new TcpSendingMessageHandler();
    tcpSendingMsgHandler.setConnectionFactory(connectionFactory);
    return tcpSendingMsgHandler;
}

и с помощью шлюза:

@MessagingGateway()
public interface MessageTcpGateway {

  @Gateway(requestChannel = "toTcpCh01")
  ListenableFuture<Void> sendTcpChannel01(@Header("host") String host, byte[] inMessage);
}

отправляем обратно.

На примере я могу понять, как динамически создавать поток для ответа.

Но я не могу понять, как создать общий пул соединений, а затем создавать прослушивающие адаптеры и адаптеры ответов на лету на основе этих соединений, а затем закрывать/удалять их во время выполнения.

Я немного понимаю, как создать поток с входящим адаптером благодаря этот вопрос

Нужно ли создавать несколько отдельных IntegrationFlow для каждого адаптера? поэтому все вызовы и ответы могут обрабатываться асинхронно (я могу ошибаться насчет асинхронности)

а затем обрабатывать их отдельно, когда хотите закрыть соединение? например, позвонить близко к TcpReceiveChannelAdapter, а затем к TcpSendingMessageHandler и, наконец, отменить регистрацию connectonfactory?


person xerido    schedule 15.06.2018    source источник


Ответы (1)


Я не знаю этого для Взаимодействующие адаптеры каналов вам нужны отдельные определения IntegrationFlow для TcpReceivingChannelAdapter и TcpSendingMessageHandler. Это действительно можно сделать одним IntegrationFlow, начиная с TcpReceivingChannelAdapter и заканчивая TcpSendingMessageHandler. Дело в том, что IntegrationFlow сам по себе является просто логическим контейнером для группировки ссылок на компоненты. Тяжелая работа действительно выполняется всеми теми компонентами, которые вы там объявляете, и с этим TcpReceivingChannelAdapter до TcpSendingMessageHandler и шлюзом между ними вы действительно будете асинхронными.

Пожалуйста, имейте в виду, что ByteArrayLengthHeaderSerializer также должен быть объявлен как bean-компонент. Не уверен, что вам нужен отдельный экземпляр для каждого динамического потока, но в любом случае есть API, чтобы сделать это оттуда:

    /**
     * Add an object which will be registered as an {@link IntegrationFlow} dependant bean in the
     * application context. Usually it is some support component, which needs an application context.
     * For example dynamically created connection factories or header mappers for AMQP, JMS, TCP etc.
     * @param bean an additional arbitrary bean to register into the application context.
     * @return the current builder instance
     */
    IntegrationFlowRegistrationBuilder addBean(Object bean);
person Artem Bilan    schedule 15.06.2018
comment
Вы говорите, что мне нужно зарегистрировать все компоненты как bean-компоненты? Или хотя бы TcpNetClientConnectionFactory и TcpReceiveChannelAdapter? И чтобы закрыть их, я должен сначала закрыть адаптеры, а затем connectionFactory? - person xerido; 18.06.2018
comment
Нет, я говорю только о ByteArrayLengthHeaderSerializer как о бобах. Все остальное действительно автоматически регистрируется процессором IntegrationFlow как bean-компоненты в контексте приложения. Вам не нужно ничего закрывать вручную - достаточно IntegrationFlow. Он будет распространяться близко ко всем своим зависимостям. - person Artem Bilan; 18.06.2018
comment
Спасибо, и последнее признание. я должен зарегистрировать их обоих таким образом? this.flowContext.registration(inflow).addBean(cf).id(hostPort + .in).register(); this.flowContext.registration(outflow).addBean(cf).id(hostPort + .out).register(); или я должен зарегистрировать бин только один раз в одном? - person xerido; 19.06.2018
comment
Нет, вам нужно зарегистрировать этот cf только один раз, так как вам действительно нужен один и тот же экземпляр для потоков inbound и outbound. - person Artem Bilan; 19.06.2018