Приложение Spring Boot с использованием Spring Cloud Stream Kafka Binder + Kafka Streams Binder не работает - Producer не отправляет сообщения

В моем приложении Spring Boot 2.3.1 с SCS Hoshram.SR6 использовалось приложение Kafka Streams Binder. Мне нужно было добавить производитель Kafka, который будет использоваться в другой части приложения, поэтому я добавил связыватель kafka. Проблема в том, что производитель не работает, выдает исключение:

19:49:40.082 [scheduling-1] [900cdeb11106e199] ERROR o.s.c.stream.binding.BindingService - Failed to create producer binding; retrying in 30 seconds
org.springframework.cloud.stream.provisioning.ProvisioningException: Provisioning exception; nested exception is java.util.concurrent.TimeoutException
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:332)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:148)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.provisionProducerDestination(KafkaTopicProvisioner.java:79)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:222)
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:90)
    at org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder.java:152)
    at org.springframework.cloud.stream.binding.BindingService.lambda$rescheduleProducerBinding$4(BindingService.java:336)
    at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:68)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.TimeoutException: null
    at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:108)
    at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:272)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicAndPartitions(KafkaTopicProvisioner.java:368)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopicIfNecessary(KafkaTopicProvisioner.java:342)
    at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.createTopic(KafkaTopicProvisioner.java:319)

Это моя конфигурация:

spring:
  cloud:
    function:
      definition: myProducer
    stream:
      bindings:
        myKStream-in-0:
          destination: my-kstream-topic
          producer:
            useNativeEncoding: true
        myProducer-out-0:
          destination: producer-topic
          producer:
            useNativeEncoding: true
      kafka:
        binder:
          brokers: ${kafka.brokers:localhost}
          min-partition-count: 3
          replication-factor: 3
          producerProperties:
            enable:
              idempotence: true
            retries: 0x7fffffff
            acks: all
            key:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              subject:
                name:
                  strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            value:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
              subject:
                name:
                  strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
            schema:
              registry:
                url: ${schema-registry.url:http://localhost:8081}
            request:
              timeout:
                ms: 5000
        streams:
          binder:
            brokers: ${kafka.brokers:localhost}
            configuration:
              application:
                id: ${spring.application.name}
                server: ${POD_IP:localhost}:${local.server.port:8080}
              schema:
                registry:
                  url: ${schema-registry.url}
              key:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
              value:
                subject:
                  name:
                    strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
              processing:
                guarantee: exactly_once
              replication:
                factor: 3
              group:
                id: kpi
            deserialization-exception-handler: logandcontinue
            min-partition-count: 3
            replication-factor: 3
            state-store-retry:
              max-attempts: 20
              backoff-period: 1500

В чем может быть проблема?

ОБНОВЛЕНИЕ

Я настроил конфигурацию следующим образом:

spring:
  cloud:
    function:
      definition: myProducer
    stream:
      function:
        definition: myKStream

Сейчас я не вижу никаких исключений, но сообщения не попадают в тему.

В другом приложении, которое использует только связыватель kafka, он отлично работает:

@Configuration
class KafkaProducerConfiguration {

    @Bean
    fun myProducerProcessor(): EmitterProcessor<Message<XXX>> {
        return EmitterProcessor.create()
    }

    @Bean
    fun myProducer(): Supplier<Flux<Message<XXX>>> {
        return Supplier { myProducerProcessor() }
    }

}

...

@Component
class XXXProducer(@Qualifier("myProducerProcessor") private val myProducerProcessor: EmitterProcessor<Message<XXX>>) {

    fun send(...): Mono<Void> {
        return Mono.defer {          
                myProducerProcessor.onNext(message)
                Mono.empty()
        }
   }

ОБНОВЛЕНИЕ 2

Я установил logging.level.org.springframework.cloud.stream: debug

В журналах появляется следующая трассировка:

o.s.c.s.binder.DefaultBinderFactory - Creating binder: kstream

Однако о Creating binder: kafka нет ничего.




Ответы (1)


Мне не хватало конфигурации нескольких связывателей для kafka и kstreams (https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.0.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_multi_binders_with_kafka_streams_based_binders_and_regular_kafka_binder)

Таким образом, мне пришлось настроить: spring.cloud.stream.function.definition=myProducer;myKStream

person codependent    schedule 26.07.2020
comment
Вы должны отметить свой ответ как принятый. - person Matthias J. Sax; 04.01.2021