Spring Integration Публикация канала подписки с Spring DSL

Я пытаюсь создать простой поток, в котором мой поток начинается с получения HTTP-запроса Post через адаптер входящего канала HTTP и публикует его в «SubscribableChannel». На этот канал может быть подписано N потребителей. На рисунке ниже показан поток.

введите здесь описание изображения

Я пытаюсь использовать Spring DSL для настройки этого потока, и у меня возникли проблемы с его работой. Ниже мой код.

@Bean
public IntegrationFlow receiveHttpPost() {
    return IntegrationFlows.from(Http.inboundChannelAdapter("/receive")
            .mappedRequestHeaders("*")
            .requestChannel(httpInAdapterPubSubChannel()))
            .transform(new ObjectToStringTransformer())
            .get();
}

@Bean
public SubscribableChannel  httpInAdapterPubSubChannel()
{
    return MessageChannels.publishSubscribe("httpInAdapterPubSubChannel")
    .get();
}

@Bean
public IntegrationFlow subscriber1() {
    return IntegrationFlows.from(httpInAdapterPubSubChannel())
            .handle( message -> System.out.println("Enrich Headers based on Payload...."))
            .get();
}

@Bean
public IntegrationFlow subscriber2() {
     return IntegrationFlows.from(httpInAdapterPubSubChannel())
                .handle( message -> System.out.println("Save Payload to Audit Table..."))
                .get();
}

Когда я запускаю этот поток, я получаю «Не удалось обработать сообщение; вложенное исключение - org.springframework.messaging.core.DestinationResolutionException: нет заголовка output-channel или replyChannel».

o.s.i.channel.PublishSubscribeChannel    : preSend on channel 'httpInAdapterPubSubChannel', message: GenericMessage [payload=Test, headers={content-length=4, http_requestMethod=POST, accept-language=en-US,en;q=0.8, accept=*/*, host=localhost:8080, http_requestUrl=http://localhost:8080/receive, connection=keep-alive, content-type=text/plain;charset=UTF-8, id=2c6ee729-96ee-1ae5-be31-a9bc56092758, cache-control=no-cache, accept-encoding=gzip, deflate, br, user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36, timestamp=1484457726393}]
o.s.i.t.MessageTransformingHandler       : org.springframework.integration.transformer.MessageTransformingHandler#0 received message: GenericMessage [payload=Test, headers={content-length=4, http_requestMethod=POST, accept-language=en-US,en;q=0.8, accept=*/*, host=localhost:8080, http_requestUrl=http://localhost:8080/receive, connection=keep-alive, content-type=text/plain;charset=UTF-8, id=2c6ee729-96ee-1ae5-be31-a9bc56092758, cache-control=no-cache, accept-encoding=gzip, deflate, br, user-agent=Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36, timestamp=1484457726393}]
o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.messaging.MessagingException: Failed to handle Message; nested exception is org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available] with root cause

org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:287) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:212) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:129) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) ~[spring-integration-core-4.3.6.RELEASE.jar:4.3.6.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.5.RELEASE.jar:4.3.5.RELEASE]

Совершенно очевидно, что я делаю что-то ОЧЕНЬ неправильно. Я попытался найти примеры, показывающие «Опубликовать канал подписки» VIA Spring Integration DSL ИЛИ конфигурацию Java. К сожалению, мне не удалось найти ни одного: - /. Я был бы искренне признателен, если бы кто-нибудь мог предоставить мне пример и помочь мне выяснить, что не так с моим потоком.

Еще одно наблюдение, которое я сделал, было: «Когда я удалил подписчиков subscriber1 и subscriber2», я все еще получаю ту же ошибку. Это означает, что я делаю что-то не так при настройке y HttpInboundAdapter.

Кроме того, ЕСЛИ я переключаю httpInAdapterPubSubChannel на прямой и имею только один поток маршрута (без ветвления), все работает нормально.


person Rajeev K    schedule 15.01.2017    source источник


Ответы (1)


.transform(new ObjectToStringTransformer()) пытается отправить результат куда-то, но не знает куда - входящий адаптер не ожидает ответа, а преобразователю некуда отправить данные.

Возможно, вы имели в виду что-то вроде этого ...

@Bean
public IntegrationFlow receiveHttpPost() {
    return IntegrationFlows.from(Http.inboundChannelAdapter("/receive")
        .mappedRequestHeaders("*"))
        .transform(new ObjectToStringTransformer())
        .channel(httpInAdapterPubSubChannel())
        .get();
}

т.е. отправить результат преобразователя в pub / sub канал.

В справочнике DSL есть несколько примеров; здесь и здесь, например.

person Gary Russell    schedule 15.01.2017
comment
Спасибо за ответ и ссылки Гэри. Вы правильно угадали, я хотел преобразовать входные данные и отправить преобразованные данные в канал Pub-Sub и допустил ошибку. Изменил, как вы рекомендовали, и он работает правильно. - person Rajeev K; 15.01.2017