Spring Cloud Stream: несколько связывателей для Kafka Producer и Consumer с отдельной конфигурацией jaas не работают вместе

Я пытаюсь реализовать потребителя Kafka и производителя Kafka в одном приложении загрузки Spring с использованием облака Spring и связующего. Оба работают успешно, если выполняются по отдельности, но если они выполняются вместе, только Kafka Producer может успешно подключиться к кластеру kafka, но Kafka Consumer не может войти в кластер Kafka. Я думаю, что проблема связана с несколькими / разными конфигурациями jaas для производителя и потребителя kafka. Ниже мой файл application.yml

spring:
  cloud:
    stream:
      bindings:
        input:
          group: consumer-tt1
          useNativeEncoding: true
          destination: consumer-topic
          content-type: application/json
          binder: consumer
        output:
          destination: producer-topic
          useNativeEncoding: true
          content-type: application/*+avro
          binder: producer
      binders:
        consumer:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      autoAddPartitions: false
                      consumer-properties:
                        key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                        value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
                      brokers: xxxxx.tt.com:9092
                      jaas:
                        loginModule: com.sun.security.auth.module.Krb5LoginModule
                        controlFlag: required
                        options:
                          useKeyTab: true
                          storeKey: true
                          keyTab: \src\main\keytab\XXXXXCON.keytab
                          principal: [email protected]
                          doNotPrompt: true
                          refreshKrb5Config: true
                      configuration:
                        application: XXXXXCON
                        sasl:
                          kerberos:
                            realm: tt.com
                            kdc: tt.com
                            service:
                              name: kafka
                          jaas:
                            loginModule: com.sun.security.auth.module.Krb5LoginModule
                            controlFlag: required
                            config:
                              useKeyTab: true
                              storeKey: true
                              keyTab: \src\main\keytab\XXXXXCON.keytab
                              principal: [email protected]
                              doNotPrompt: true
                              refreshKrb5Config: true
                        security:
                          protocol: SASL_SSL
                        ssl:
                          truststore:
                            location: \src\main\keytab\truststore.jks
                            password: 123456789
                            type: JKS
        producer:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      autoCreateTopics: false
                      autoAddPartitions: false
                      producer-properties:
                        key.serializer: org.apache.kafka.common.serialization.StringSerializer
                        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
                        schema.registry.url: http:/kafka-schema:8484
                      brokers: xxxxx.tt.com:9092
                      jaas:
                        loginModule: com.sun.security.auth.module.Krb5LoginModule
                        controlFlag: required
                        options:
                          useKeyTab: true
                          storeKey: true
                          keyTab: \src\main\keytab\XXXXXPRO.keytab
                          principal: [email protected]
                          doNotPrompt: true
                          refreshKrb5Config: true
                      configuration:
                        application:
                           id: XXXXXPRO
                        sasl:
                          kerberos:
                            realm: tt.com
                            kdc: tt.com
                            service:
                              name: kafka
                          jaas:
                            loginModule: com.sun.security.auth.module.Krb5LoginModule
                            controlFlag: required
                            config:
                              useKeyTab: true
                              storeKey: true
                              keyTab: \src\main\keytab\XXXXXPRO.keytab
                              principal: [email protected]
                              doNotPrompt: true
                              refreshKrb5Config: true
                        security:
                          protocol: SASL_SSL
                        ssl:
                          truststore:
                            location: \src\main\keytab\truststore.jks
                            password: 123456789
                            type: JKS
                  schema-registry-client:
                    endpoint: http:/kafka-schema:8484

Если я запускаю этот же application.yml с @EnableBinding (Source.class) или @EnableBinding (Sink.class) в основном классе, он успешно соединяется с кластером kafka как производитель Kafka или потребитель Kafka. Но когда я запускаю этот же application.yml с @EnableBinding (Processor.class), я получаю ошибку ниже с потребителем kafka (производитель Kafka отлично работает и подключается к кластеру Kafka). Проблема только с Kafka Consumer: не авторизован для доступа к теме.

org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [consumer-topic]
2020-03-24 19:45:07.794  WARN 19000 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : No partitions have been retrieved for the topic (consumer-topic). This will affect the health check.
2020-03-24 19:45:07.794  WARN 19000 --- [           main] o.s.c.s.b.k.p.KafkaTopicProvisioner      : The number of expected partitions was: 1, but 0 has been found instead.There will be 1 idle consumers
2020-03-24 19:45:07.796 ERROR 19000 --- [           main] o.s.cloud.stream.binding.BindingService  : Failed to create consumer binding; retrying in 30 seconds

org.springframework.cloud.stream.binder.BinderException: Exception thrown while starting consumer: 
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:435) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:97) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractBinder.bindConsumer(AbstractBinder.java:142) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:144) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:122) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.BindableProxyFactory.createAndBindInputs(BindableProxyFactory.java:254) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:1.8.0_162]
    at org.springframework.cloud.stream.binding.AbstractBindingLifecycle.start(AbstractBindingLifecycle.java:48) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.start(InputBindingLifecycle.java:34) [spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:893) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:744) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:391) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1204) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at com.rbc.ess.ESSEventTransform.EssEventTransformApplication.main(EssEventTransformApplication.java:21) ~[classes/:na]
Caused by: java.lang.IllegalArgumentException: A list of partitions must be provided
    at org.springframework.util.Assert.isTrue(Assert.java:118) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:446) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createConsumerEndpoint(KafkaMessageChannelBinder.java:133) ~[spring-cloud-stream-binder-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder.doBindConsumer(AbstractMessageChannelBinder.java:382) ~[spring-cloud-stream-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    ... 24 common frames omitted

Пожалуйста, проверьте и поясните, как передать несколько конфигураций jaas в приложении связывания облачного потока Spring




Ответы (1)


Ваша конфигурация jaas неверна; должен быть

jaas:
  config: com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="\src\main\keytab\XXXXXCON.keytab" principal="[email protected]" doNotPrompt=true refreshKrb5Config=true;

См. документацию.

Я не могу объяснить, почему это вообще работает, когда у вас только одна связка.

Кроме того, ваш useNativeEncoding неверен; должен быть

...
        input:
          consumer:
            useNativeDecoding: true
...
        output:
          producer:
            useNativeEncoding: true
person Gary Russell    schedule 25.03.2020
comment
Я попытался внести рекомендованные выше изменения jaas, но при запуске производителя или потребителя по отдельности или вместе получаю ошибку ниже: Caused by: org.apache.kafka.common.KafkaException: javax.security.auth.login.LoginException: Could not login: the client is being asked for a password, but the Kafka client code does not currently support obtaining a password from the user. not available to garner authentication information from the user at org.apache.kafka.common.network.SaslChannelBuilder.configure(SaslChannelBuilder.java:153) - person jitin sharda; 25.03.2020
comment
Все, что я пытаюсь, - это иметь несколько файлов jaas для нескольких связывателей для аутентификации с темой kafka - person jitin sharda; 25.03.2020
comment
Извините, я не эксперт по SASL / JAAS; Я просто знаю, как свойства Spring сопоставляются со свойствами Kafka и как клиент Kafka анализирует свойство jaas.config. Это похоже на какое-то несоответствие между конфигурацией SASL клиента и сервера. Я предлагаю вам задать новый вопрос с этим в качестве темы. - person Gary Russell; 25.03.2020