Поток ветки Spring Cloud Stream не работает должным образом

Ниже приведен код ветвления, он направлен только на одну тему (первую). Как я понял, он должен стримить по всем трем темам?

В любом случае я могу транслировать по трем темам с помощью ветки?

@Bean
        public Function<KStream<String, Usesr>, KStream<String, User>[]> testprocess() {

            Predicate<String, User> stream1 = (k, v) -> v != null;
            Predicate<String, User> stream2 = (k, v) -> v != null;
            Predicate<String, User> stream3 = (k, v) -> v != null;

            return input -> input.map(
                    (key, user) -> new KeyValue<String, User>(user.getId(), user))
                    .branch(stream1, stream2, stream3);

Конфигурация для процессора

        testprocess-in-0:
          destination: input.users
        testprocess-out-0:
          destination: users.test.out.0
        testprocess-out-1:
          destination: users.test.out.1
        testprocess-out-2:
          destination: users.test.out.2

person jka    schedule 03.01.2020    source источник


Ответы (1)


Глядя на ваши предикаты, оказывается, что первый предикат всегда побеждает, а другие не получают никаких шансов. В ветвлении Kafka Streams первый предикат, который оценивается как истинный, выполняется успешно, и соответствующая тема получает данные. Вам нужно изменить логику в предикатах, чтобы правильно отображалась правильная тема. Вот пример.

person sobychacko    schedule 03.01.2020
comment
Спасибо @sobychacko за ответ. Чтобы уточнить, мой вариант использования - скопировать входной поток в 3 разных потока без какого-либо фильтра. Не уверен, что могу сделать это в конфигурации: testprocess-in-0: destination: input.users testprocess-out-0: destination: users.test.out.0, users.test.out.1, users.test. вых.2 - person jka; 05.01.2020
comment
Ветвление в основном используется, когда вам нужно разделить входной поток на основе определенной логики фильтрации, выраженной через предикат. Если это не так, и вы просто хотите передать поток как сам по себе в выходную тему, тогда вы можете рассмотреть возможность использования нескольких процессоров или отправки вручную в выходную тему с помощью метода to на KStream. - person sobychacko; 06.01.2020