Работает ли Spring Integration ServiceActivator с типами Project Reactor?

Я использую Spring Integration 5.1.5 (с RabbitMQ, использующим spring-integration-amqp), и я читаю в документы о том, что Spring Integration поддерживает типы реактора проекта (под этим я подразумеваю Mono, Flux и т. д.) . Но я не могу заставить это работать для ServiceActivator. . Я пробую что-то вроде этого:

@ServiceActivator
public Mono<Void> myMethod(List<Message> messages) {
   Mono<Void> result = myService.doServiceStuff(messages);
   return result;
}

(обратите внимание, что я также пытаюсь заставить myMethod работать с Flux<Message>, но это отдельная проблема).

Когда myMethod возвращает Mono<Void>, я получаю эту ошибку:

Caused by: org.springframework.messaging.core.DestinationResolutionException: no output-channel or replyChannel header available
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162)
    ... 42 common frames omitted

Изменение метода на:

@ServiceActivator
public void myMethod(List<Message> messages) {
   Mono<Void> result = myService.doServiceStuff(messages);
   result.subscribe(); // This is not what I want to do
}

и подписка на реактивный поток вручную заставит его работать, однако, очевидно, это не то, что я хочу делать. Я бы предпочел, чтобы структура spring-integration справилась с подпиской.

Поддерживается ли это в Spring Integration? Если да, то что я делаю не так?


person Johan    schedule 27.05.2019    source источник


Ответы (1)


Чего именно вы пытаетесь достичь, возвращая Mono<Void>?

Когда метод активатора службы возвращает какое-либо значение, это значение отправляется в выходной канал. Когда это Mono<?>, отправка выполняется после завершения моно.

Просто установите тип возвращаемого значения void.

person Gary Russell    schedule 28.05.2019
comment
Вы определенно не должны ожидать, что Framework подпишется на этот Mono за вас. IMHO, это злоупотребление спецификацией Reactive Streams для безоговорочной подписки. Вы можете добавить outputChannel в качестве FluxMessage канала, и тогда Framework добавит этот Mono в качестве источника для общего Flux. Но все же подписка должна происходить от вас или от какого-либо протокола, например WbFlux или RSocket. - person Artem Bilan; 28.05.2019