Я использую облачную функцию Spring для обработки данных из kafka с помощью Flux. По умолчанию он обрабатывает данные в потоке потребителя (где используется сообщение). Я собираюсь реализовать пул потоков для параллельной обработки и регулирования данных, и в Spring Cloud Integration есть отличная реализация под названием executeorChannel (https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ExecutorChannel.html)
Пример реализации функции:
public static class FN1 implements Function<Flux<String>, Flux<String>> {
public Flux<String> apply(Flux<String> data) {
return data
.map(f -> doSomething() )
}
}
Так что я не нашел простого способа соединить реализованные таким образом функции через executorChannel.
M.b. есть способ определить тип inputChannel?
UPD: Читайте комментарии под ответом Олега. Они очень полезны.