Kafka Streams: определите несколько потоков Kafka с помощью Spring Cloud Stream для каждого набора тем.

Я пытаюсь сделать простой POC с помощью Kafka Streams. Однако я получаю исключение при запуске приложения. Я использую Spring-Kafka, Kafka-Streams 2.5.1 с конфигурацией потока Spring boot 2.3.5 Kafka

@Configuration
public class KafkaStreamsConfig {
    private static final Logger log = LoggerFactory.getLogger(KafkaStreamsConfig.class);

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log
                .info("AAA Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processBBB() {
        return input -> input.peek((key, value) -> log
                .info("BBB Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processCCC() {
        return input -> input.peek((key, value) -> log
                .info("CCC Cloud Stream Kafka Stream processing : {}", input.toString().length()));
    }

    /*
    @Bean
    public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {
        final Properties props = new Properties();
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "groupId-1"););
        props.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
        final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
        kafkaStreams.start();
        return kafkaStreams;
    }

    @Bean
    public Topology kafkaStreamTopology() {
        final StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(Arrays.asList(AAATOPIC, BBBInputTOPIC, CCCInputTOPIC));
        return streamsBuilder.build();
    } */
}

application.yaml настроен, как показано ниже. Идея в том, что у меня есть 3 темы ввода и 3 темы вывода. Компонент принимает входные данные из темы ввода и выдает вывод в тему вывода.

spring:
  application.name: consumerapp-1
  cloud:
    function:
      definition: processAAA;processBBB;processCCC
    stream:
      kafka.binder: 
          brokers: 127.0.0.1:9092
          autoCreateTopics: true
          auto-add-partitions: true
      kafka.streams.binder:
          configuration: 
            commit.interval.ms: 1000
            default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
            default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
      bindings:
        processAAA-in-0:
          destination: aaaInputTopic
        processAAA-out-0:
          destination: aaaOutputTopic
        processBBB-in-0:
          destination: bbbInputTopic
        processBBB-out-0:
          destination: bbbOutputTopic
        processCCC-in-0:
          destination: cccInputTopic
        processCCC-out-0:
          destination: cccOutputTopic

Выброшенное исключение

Caused by: java.lang.IllegalArgumentException: Trying to prepareConsumerBinding public abstract void org.apache.kafka.streams.kstream.KStream.to(java.lang.String,org.apache.kafka.streams.kstream.Produced)  but no delegate has been set.
at org.springframework.util.Assert.notNull(Assert.java:201)
at org.springframework.cloud.stream.binder.kafka.streams.KStreamBoundElementFactory$KStreamWrapperHandler.invoke(KStreamBoundElementFactory.java:134)

Может ли кто-нибудь помочь мне с примерами кода Kafka Streams Spring-Kafka для обработки с несколькими темами ввода и вывода.

Обновления: 21 января 2021 г.

После удаления всех конфигураций bean-компонентов kafkaStreams и kafkaStreamsTopology я получаю сообщение ниже в бесконечном цикле. Потребление сообщений по-прежнему не работает. Я проверил подписку в application.yaml с определениями функции @Bean. они все выглядят нормально, но я все равно получаю эту ошибку перекрестной проводки. Я заменил application.properties на application.yaml выше

    [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription
2021-01-21 14:12:43,336 WARN org.apache.kafka.clients.consumer.internals.ConsumerCoordinator [consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1] [Consumer clientId=consumerapp-1-75eec5e5-2772-4999-acf2-e9ef1e69f100-StreamThread-1-consumer, groupId=consumerapp-1] We received an assignment [cccParserTopic-0] that doesn't match our current subscription Subscribe(bbbParserTopic); it is likely that the subscription has changed since we joined the group. Will try re-join the group with current subscription



Ответы (2)


Мне удалось решить проблему. Я пишу это для блага других. Если вы хотите включить несколько потоков в единый jar-файл приложения, то ключевым моментом является определение нескольких идентификаторов приложений, по одному на каждый из ваших потоков. Я знал это с самого начала, но не знал, как это определить. Наконец, ответ - это то, что мне удалось найти после прочтения документации SCSt. Ниже показано, как можно определить application.yaml. application.yaml, как показано ниже

spring:
  application.name: kafkaMultiStreamConsumer
  cloud:
    function:
      definition: processAAA; processBBB; processCCC --> // needed for Imperative @StreamListener
    stream:
      kafka: 
        binder:
          brokers: 127.0.0.1:9092
          min-partition-count: 3
          replication-factor: 2
          transaction:
            transaction-id-prefix: transaction-id-2000
          autoCreateTopics: true
          auto-add-partitions: true
        streams:
          binder:
            functions: 
            // needed for functional
              processBBB: 
                application-id: SampleBBBapplication
              processAAA: 
                application-id: SampleAAAapplication
              processCCC: 
                application-id: SampleCCCapplication
            configuration: 
              commit.interval.ms: 1000            
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde        
      bindings:
      // Below is for Imperative Style programming using 
      // the annotation namely @StreamListener, @SendTo in .java class
        inputAAA:
          destination: aaaInputTopic
        outputAAA:
          destination: aaaOutputTopic
        inputBBB:
          destination: bbbInputTopic
        outputBBB:
          destination: bbbOutputTopic
        inputCCC:
          destination: cccInputTopic
        outputCCC:
          destination: cccOutputTopic
     // Functional Style programming using Function<KStream...> use either one of them
     // as both are not required. If you use both its ok but only one of them works
     // from what i have seen @StreamListener is triggered always.
     // Below is from functional style
        processAAA-in-0:
          destination: aaaInputTopic
          group: processAAA-group
        processAAA-out-0:
          destination: aaaOutputTopic
          group: processAAA-group
        processBBB-in-0:
          destination: bbbInputTopic
          group: processBBB-group
        processBBB-out-0:
          destination: bbbOutputTopic
          group: processBBB-group
        processCCC-in-0:
          destination: cccInputTopic
          group: processCCC-group
        processCCC-out-0:
          destination: cccOutputTopic
          group: processCCC-group

Как только выше определено, нам теперь нужно определить отдельные классы Java, в которых реализована логика обработки потока. Ваш класс Java может быть примерно таким, как показано ниже. Создайте аналогично для других 2 или N потоков в соответствии с вашими требованиями. Один из примеров приведен ниже: AAASampleStreamTask.java.

@Component
@EnableBinding(AAASampleChannel.class) // One Channel interface corresponding to in-topic and out-topic
public class AAASampleStreamTask {
    private static final Logger log = LoggerFactory.getLogger(AAASampleStreamTask.class);

    @StreamListener(AAASampleChannel.INPUT)
    @SendTo(AAASampleChannel.OUTPUT)
    public KStream<String, String> processAAA(KStream<String, String> input) {
        input.foreach((key, value) -> log.info("Annotation AAA *Sample* Cloud Stream Kafka Stream processing {}", String.valueOf(System.currentTimeMillis())));
       ...
       // do other business logic
       ...
        return input;
    }
    
    /**
     * Use above or below. Below style is latest startting from ScSt 3.0 if iam not 
     * wrong. 2 different styles of consuming Kafka Streams using SCSt. If we have 
     * both then above gets priority as per my observation
     */     
    /* 
    @Bean
    public Function<KStream<String, String>, KStream<String, String>> processAAA() {
        return input -> input.peek((key, value) -> log.info(
                "Functional AAA *Sample* Cloud Stream Kafka Stream processing : {}", String.valueOf(System.currentTimeMillis())));
       ...
     // do other business logic
       ...
    }
    */
}

Канал необходим, если вы хотите использовать императивное программирование, а не функциональное. AAASampleChannel.java

public interface AAASampleChannel {
    String INPUT = "inputAAA";
    String OUTPUT = "outputAAA";

    @Input(INPUT)
    KStream<String, String> inputAAA();

    @Output(OUTPUT)
    KStream<String, String> outputAAA();
}
person Guru    schedule 22.01.2021

Похоже, вы смешиваете Spring Cloud Stream и Spring Kafka в приложении. При использовании связующего вам не нужно напрямую определять компоненты, требуемые Spring Kafka, такие как KafkaStreams и Topology, скорее они создаются SCSt неявно. Можете ли вы удалить следующие бобы и попробовать еще раз?

@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties) {

а также

@Bean
public Topology kafkaStreamTopology() {

Если вы по-прежнему сталкиваетесь с проблемами, поделитесь небольшой выборкой, которую можно воспроизвести, чтобы мы могли провести дальнейшую сортировку.

person sobychacko    schedule 20.01.2021
comment
Спасибо за ответ. Я пытаюсь заставить это работать. Сначала я попробовал программировать с помощью функционального моделирования, а затем с императивным стилем, но и то, и другое не работает. Как ни странно. После внесения предложенных вами изменений. теперь я получаю исключение ниже. Я отредактировал ответ. пожалуйста, проверьте - person Guru; 21.01.2021
comment
я решил, что если я удалю другие 2 beans / темы и просто сохраню одну функцию ‹KStream ...› и 1 тему. Вещи начинают работать. В тот момент, когда я добавляю 3 темы и 3 компонента с функцией ‹›, я получаю это предупреждение. Мы не можем иметь несколько привязок в SCSt. Моя проблема в том, что у меня есть один набор тем для каждого потока, и у нас есть много таких потоков тем, как A-A1-A2, B-B1-B2 и т. Д., Где A, A1, B, B1 и т. Д. - все темы. Для каждого из этих наборов я хотел создать KStream. Разве на банку нам разрешено иметь только 1 поток, а несколько потоков не разрешены. Пожалуйста, дай мне знать - person Guru; 21.01.2021