Использование Spring Integration executorChannel с функцией Spring Cloud

Я использую облачную функцию Spring для обработки данных из kafka с помощью Flux. По умолчанию он обрабатывает данные в потоке потребителя (где используется сообщение). Я собираюсь реализовать пул потоков для параллельной обработки и регулирования данных, и в Spring Cloud Integration есть отличная реализация под названием executeorChannel (https://docs.spring.io/spring-integration/api/org/springframework/integration/channel/ExecutorChannel.html)

Пример реализации функции:

public static class FN1 implements Function<Flux<String>, Flux<String>> {
  public Flux<String> apply(Flux<String> data) {
    return data
      .map(f ->  doSomething() )      
  }
}

Так что я не нашел простого способа соединить реализованные таким образом функции через executorChannel.

M.b. есть способ определить тип inputChannel?

UPD: Читайте комментарии под ответом Олега. Они очень полезны.


person Elijah    schedule 13.04.2020    source источник


Ответы (1)


Вы имеете в виду что-то вроде этого?

@SpringBootApplication
public class SampleFunctoinAppApplication  {

    public static void main(String[] args) throws Exception {

        ApplicationContext context = SpringApplication.run(SampleFunctoinAppApplication.class, args);
        SubscribableChannel output = context.getBean("output", SubscribableChannel.class);
        output.subscribe(System.out::println);

        MessageChannel channel = context.getBean("executorChannel", MessageChannel.class);
        channel.send(new GenericMessage<String>("hello"));
    }

    @Bean
    public IntegrationFlow flow() {
        return IntegrationFlows
                .from("executorChannel")
                .transform(echo())
                .channel("output")
                .get();
    }

    @Bean
    public ExecutorChannel executorChannel() {
        return new ExecutorChannel(Executors.newCachedThreadPool());
    }

    public Function<String, String> echo() {
        return v -> v;
    }
}

Что вы имеете в виду под «определением типа inputChannel»?

person Oleg Zhurakousky    schedule 13.04.2020
comment
Да, но одна из основных функций SCF заключается в том, что вы можете реализовать только функцию или несколько и связать друг друга с внешней конфигурацией, используя привязки из облачных потоков swping и function.definition В вашем примере эти определения жестко запрограммированы. - person Elijah; 13.04.2020
comment
определите тип inputChannel следующим образом: `` @Bean @Transformer (inputChannel = headerEnricherChannel, outputChannel = transformChannel) public HeaderEnricher headerEnricher () {return new HeaderEnricher (Collections.singletonMap (SimpMessageHeaderAccessionHeaderAccessor ‹SESSION_ ); } `` ` - person Elijah; 13.04.2020
comment
Затем вы говорите о spring-cloud-stream и функциональных привязках - cloud.spring.io/spring-cloud-static/spring-cloud-stream/, верно? - person Oleg Zhurakousky; 13.04.2020
comment
Ok. 1. Получил несколько реализованных функций Function ‹Flux‹ ... ›, Flux‹ ... ››. 2. Я могу использовать связыватели Spring Cloud Streams для привязки этих функций к брокерам сообщений, просто настраивая их. Таким образом, реализация настолько проста, насколько это возможно. Все отлично 3. Я хочу сделать брокера регулирования - ›(прямой канал) -› FN1 - ›(ExecutionChannel) -› FN2. Я хочу сохранить гибкость Spring-Cloud-функций и повторно использовать уже реализованный исполняющий канал для дросселирования, потому что это здорово. решение. Просто кодируйте все - хорошо, но m.b. есть более изящное решение? - person Elijah; 13.04.2020
comment
Вы настаиваете на том, чтобы у вас был executorChannel, не осознавая, что параллелизм потребителей уже обрабатывается фреймворком - cloud.spring.io/spring-cloud-static/spring-cloud-stream/. Просмотрите документацию. Кроме того, это не значит, что вам нужен канал исполнителя. Это лишь одна из многих деталей реализации. У вас есть бизнес-кейс, который, как я полагаю, требует параллелизма, поэтому подумайте о том, чтобы изложить свой кейс, чтобы мы могли лучше посоветовать вам правильный путь в весеннем портфеле. - person Oleg Zhurakousky; 13.04.2020
comment
Я не могу подтвердить, но когда я реализовал простой HTTP-запрос внутри функции Flux (например), независимо от параметра параллелизма, все запросы обрабатывались в одном потоке. Журналы показывают, что было несколько потоков-потребителей, но обработка была однопоточной. - person Elijah; 13.04.2020
comment
Когда вы имеете дело с реактивными функциями, она отличается от императивной, поскольку реактивная функция вызывается только один раз для подключения и запуска потока. Единицей обработки в императивной функции является отдельное сообщение, в то время как в реактивной - весь поток. Причина, по которой я говорю все это, заключается в том, что, хотя реактивная функция дает вам больше возможностей для обработки сложных вариантов использования, она также достигается за счет того, что определенная ответственность ложится на пользователя, включая парралелизацию - Flux.parallel() - person Oleg Zhurakousky; 14.04.2020
comment
Олег, спасибо. Итак, я знал о конкретном вызове потока, поэтому я думал об executeorChannel как о простом механизме для организации дросселирования (перемещения императивного кода в пул потоков). Я попробую для этого параллель. - person Elijah; 14.04.2020