Невозможно установить groupId и clientId при использовании Spring Cloud Stream Kafka Binder

У меня серьезные проблемы с Spring Cloud Stream Kafka Binder. Существует много неоднозначности и проблем с согласованностью, связанных с настройками конфигурации Spring Cloud 3.0.2.RELEASE. Я пытался установить идентификаторы групп и идентификаторы клиентов для тем Kafka, но, несмотря на опробование различных комбинаций, мне не удалось правильно настроить идентификатор группы.

В документации утверждается, что мы должны иметь возможность установить идентификатор группы и идентификатор клиента, настроив один из следующих параметров: https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/current/reference/html/spring-cloud-stream.html#binding-properties

spring.cloud.stream.default.group
spring.cloud.stream.default.consumer.group
spring.cloud.stream.kafka.default.consumer.group
spring.cloud.stream.bindings.<channelName>.group

Ни одна из вышеперечисленных конфигураций не работает для установки идентификатора клиента для производителей или идентификатора группы для потребителей. Единственный прогресс, которого я вообще добился, - это установка идентификатора клиента через совершенно другую конфигурацию.

spring.kafka.client-id
spring.kafka.admin.client-id
spring.kafka.producer.client-id

После установки успешной настройки идентификатора клиента с этими настройками я попытался установить идентификатор группы для потребителей, что, на удивление, не сработало.

spring.kafka.group-id   <---- does not exist as a property, but tried this anyway
spring.kafka.consumer.group-id

Изменить: вот настройка приложения.

Application.java

@SpringBootApplication
@EnableSwagger2
public class Application {
  public static void main(String[] args) {
    SpringApplication.run(Application.class, args);
  }

  @Bean
  public Docket swaggerApi() {
    return new Docket(DocumentationType.SWAGGER_2)
        .select()
        .apis(RequestHandlerSelectors.any())
        .paths(regex("^(?!.*error).+$"))
        .build()
        .pathMapping("/");
  }
}

application.yaml

spring:
  cloud:
    stream:
      bindings:
        MyKafkaTopicBinderChannel:
          destination: MyKafkaTopic
          group: MyServiceGroup
      default:
        producer:
          useNativeEncoding: on
        consumer:
          useNativeEncoding: on
        contentType: application/*+avro
      kafka:
        binder:
          brokers: some.broker.io
          producer-properties:
            key:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            value:
              serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema:
              registry:
                url: some.registry.io
          consumer-properties:
            key:
              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            value:
              deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
            schema:
              registry.url: some.registry.io
            specific:
              avro:
                reader: true

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>io.some.org</groupId>
  <artifactId>MyService</artifactId>
  <version>1.0.0</version>
  <name>chatbotApi</name>
  <description>Spring Boot Service</description>

  <properties>
    <java.version>11</java.version>
    <gson.version>2.8.6</gson.version>
    <springfox.version>2.9.2</springfox.version>
    <swagger-annotations.version>1.6.0</swagger-annotations.version>
    <swagger-models.version>1.6.0</swagger-models.version>
    <jackson-datatype-jsr310.version>2.10.2</jackson-datatype-jsr310.version>
    <avro.version>1.9.2</avro.version>
    <avro-maven-plugin.version>1.9.2</avro-maven-plugin.version>
    <confluent.kafka.version>5.4.0</confluent.kafka.version>
    <kafka-clients.version>2.4.0</kafka-clients.version>
    <spring-cloud.version>3.0.2.RELEASE</spring-cloud.version>
  </properties>

  <repositories>
    <repository>
      <id>confluent</id>
      <url>http://packages.confluent.io/maven/</url>
    </repository>
  </repositories>
  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.4.RELEASE</version>
    <relativePath/>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-configuration-processor</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
      <version>${spring-cloud.version}</version>
    </dependency>
    <dependency>
      <groupId>com.google.code.gson</groupId>
      <artifactId>gson</artifactId>
      <version>${gson.version}</version>
    </dependency>
    <dependency>
      <groupId>com.fasterxml.jackson.datatype</groupId>
      <artifactId>jackson-datatype-jsr310</artifactId>
      <version>${jackson-datatype-jsr310.version}</version>
    </dependency>
    <dependency>
      <groupId>io.springfox</groupId>
      <artifactId>springfox-swagger2</artifactId>
      <version>${springfox.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
        <exclusion>
          <groupId>io.swagger</groupId>
          <artifactId>swagger-annotations</artifactId>
        </exclusion>
        <exclusion>
          <groupId>io.swagger</groupId>
          <artifactId>swagger-models</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>io.springfox</groupId>
      <artifactId>springfox-swagger-ui</artifactId>
      <version>${springfox.version}</version>
    </dependency>
    <dependency>
      <groupId>io.swagger</groupId>
      <artifactId>swagger-annotations</artifactId>
      <version>${swagger-annotations.version}</version>
    </dependency>
    <dependency>
      <groupId>io.swagger</groupId>
      <artifactId>swagger-models</artifactId>
      <version>${swagger-models.version}</version>
    </dependency>

    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-schema-registry-client</artifactId>
      <version>${confluent.kafka.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-avro-serializer</artifactId>
      <version>${confluent.kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>io.confluent</groupId>
      <artifactId>kafka-streams-avro-serde</artifactId>
      <version>${confluent.kafka.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>${kafka-clients.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>${avro.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro-tools</artifactId>
      <version>${avro.version}</version>
      <exclusions>
        <exclusion>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

    <dependency>
      <groupId>com.h2database</groupId>
      <artifactId>h2</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
      <exclusions>
        <exclusion>
          <groupId>org.junit.vintage</groupId>
          <artifactId>junit-vintage-engine</artifactId>
        </exclusion>
      </exclusions>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-test-support</artifactId>
      <version>${spring-cloud.version}</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <resources>
      <resource>
        <directory>src/main/resources</directory>
        <excludes>
          <exclude>local.yaml</exclude>
          <exclude>avro/*</exclude>
        </excludes>
        <filtering>true</filtering>
      </resource>
    </resources>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
      <plugin>
        <groupId>org.apache.avro</groupId>
        <artifactId>avro-maven-plugin</artifactId>
        <version>${avro-maven-plugin.version}</version>
        <executions>
          <execution>
            <id>schemas</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>schema</goal>
              <goal>protocol</goal>
              <goal>idl-protocol</goal>
            </goals>
            <configuration>
              <sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory>
              <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
            </configuration>
          </execution>
        </executions>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <configuration>
          <source>${java.version}</source>
          <target>${java.version}</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

SpringIntegrationService.java

@Component
@EnableBinding(SpringIntegrationService.KafkaTopicBindings.class)
public class SpringIntegrationService {
  private static Logger logger = LoggerFactory.getLogger(SpringIntegrationService.class);
  private MessageChannel someChannel;

  public interface KafkaTopicBindings {
    String MY_KAFKA_TOPIC_BINDER_CHANNEL = "MyKafkaTopicBinderChannel";

    @Output(KafkaTopicBindings.MY_KAFKA_TOPIC_BINDER_CHANNEL)
    MessageChannel someChannel();
  }

  public SpringIntegrationService(KafkaTopicBindings bindings) {
    this.someChannel = bindings.someChannel();
  }

  @ServiceActivator(inputChannel = "entry.kafka")
  public boolean entryKafka(Message<someChannel> msg) {
    logger.info("entryKafka(): Payload: {}", msg.getPayload());

    try {
      return someChannel.send(MessageBuilder.withPayload(msg.getPayload())
          .setHeader(KafkaHeaders.MESSAGE_KEY, "Some Key").build());
    } catch (Exception e) {
      logger.warn("entryKafka(): Failed to send message onto someChannel topic", e);
      return false;
    }
  }
}

comment
У меня такая же проблема. Решена ли она?   -  person lion_pankaj    schedule 08.06.2020


Ответы (2)


Вот репо, где я попытался обновить приложение, упомянутое в блог. Исправлено несколько вещей в конфигурации и обновлены примеры для использования новой функциональной модели. Я убедился, что это работает. Не могли бы вы запустить его на своем конце и сравнить с вашей настройкой? Если вы сможете использовать этот образец как средство сообщения о любых потенциальных проблемах, это поможет нам в дальнейшем вам помочь.

person sobychacko    schedule 28.02.2020
comment
Я пробовал код в репо, и он действительно отлично работает. Однако вслед за комментарием @Oleg Zhurakousky кажется, что конфигурация отлично работает с реактивной моделью программирования, не будет работать, если вы попытаетесь использовать Spring Cloud Stream Binder Kafka с Spring Integration. - person superflux; 28.02.2020
comment
Для всех, кто после меня, не используйте Spring Integration со Spring Cloud Stream: cloud.spring.io/spring-cloud-static/spring-cloud-stream/ - person superflux; 29.02.2020

Я не уверен, что вам не подходит, и, не увидев от вас полного образца, сказать невозможно. Итак, вот простой пример, который мы можем использовать в качестве отправной точки:

@SpringBootApplication
public class SimpleStreamApplication {

    public static void main(String[] args) throws Exception {
        SpringApplication.run(SimpleStreamApplication.class,
                "--spring.cloud.function.definition=uppercase",
                "--spring.cloud.stream.bindings.uppercase-in-0.destination=uppercase",
                "--spring.cloud.stream.bindings.uppercase-in-0.group=uppercase.group");
    }

   @Bean
    public Function<String, String> uppercase() {
       return v -> v.toUpperCase();
    }
}

Это приведет к правильному созданию / назначению группы потребителей. Вы хотите сказать, что это не работает для вас?

person Oleg Zhurakousky    schedule 27.02.2020
comment
Я уже давно обновил пост. Я также следил за руководством: baeldung.com/spring-cloud-stream -kafka-avro-confluent - person superflux; 27.02.2020
comment
Если вы новичок в Spring-cloud-stream, рассмотрите возможность перехода на модель функционального программирования. Как видите, удалив почти все ссылки на модель аннотативного программирования, мы практически исключили тот подход, который вы сейчас используете. Кроме того, я предоставил вам простейший пример, чтобы начать движение, пока вы ссылаетесь на какой-то блог, что создает некоторую отсоединение, поскольку я не уверен, что именно вы хотите получить от этого взаимодействия. - person Oleg Zhurakousky; 28.02.2020
comment
Извините, я не знал, что содержание в руководстве Baeldung было устаревшим. Я начал с использования руководства Baeldung, потому что в нем было более подробно описано, как что-то запустить и запустить. Я также заметил, что очищенный код, предоставленный @sobychacko, использовал реактивные библиотеки. Мне очень нравятся библиотеки, но я боялся полагаться на версии реактивных драйверов баз данных, которые на данный момент нестабильны. Я перейду на новую парадигму. - person superflux; 28.02.2020
comment
Я пропустил добавление function.definition. Этот пост помог. Спасибо :) - person raksheetbhat; 01.02.2021