Как получить доступ к серверу реестра конфлюентной схемы, защищенному паролем, с помощью облачного потока Spring?

Я использую Spring Cloud Stream вместе с реестром схем Aiven, который использует реестр схем confluent. Реестр схем Aiven защищен паролем. На основе этих инструкций. эти два параметра конфигурации должны быть установлены для успешного доступа к серверу реестра схемы.

 props.put("basic.auth.credentials.source", "USER_INFO");
 props.put("basic.auth.user.info", "avnadmin:schema-reg-password");

Все в порядке, когда я использую только драйверы kafka vanilla java, но если я использую облачный поток Spring, я не знаю, как ввести эти два параметра. В данный момент я помещаю "basic.auth.user.info" и "basic.auth.credentials.source" в "spring.cloud.stream.kafka.binder.configuration" файла application.yml.

При этом я получаю "401 Unauthorized" в строке, где схема хочет зарегистрироваться.

Обновление 1:

Основываясь на предложении Алина, я обновил способ настройки компонента SchemaRegistryClient, чтобы он узнал о контексте SSL.

@Bean
public SchemaRegistryClient schemaRegistryClient(
    @Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
  try {
    final KeyStore keyStore = KeyStore.getInstance("PKCS12");
    keyStore.load(new FileInputStream(
            new File("path/to/client.keystore.p12")),
        "secret".toCharArray());

    final KeyStore trustStore = KeyStore.getInstance("JKS");
    trustStore.load(new FileInputStream(
            new File("path/to/client.truststore.jks")),
        "secret".toCharArray());

    TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;

    SSLContext sslContext = SSLContextBuilder
        .create()
        .loadKeyMaterial(keyStore, "secret".toCharArray())
        .loadTrustMaterial(trustStore, acceptingTrustStrategy)
        .build();

    HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
    ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
        httpClient);
    ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
        new RestTemplate(requestFactory));
    schemaRegistryClient.setEndpoint(endpoint);
    return schemaRegistryClient;
  } catch (Exception ex) {
    ex.printStackTrace();
    return null;
  }
}

Это помогло избавиться от ошибки при запуске приложения и зарегистрировать схему. Однако всякий раз, когда приложение хотело отправить сообщение в Kafka, снова выдавалась новая ошибка. Наконец, это также было исправлено ответом Ммельсена.


person Milad    schedule 16.04.2019    source источник
comment
Не могли бы вы поделиться всеми свойствами, которые вы поместили в файл application.yml?   -  person Ali    schedule 17.04.2019
comment
@Milad Вы нашли для этого решение?   -  person mmelsen    schedule 25.04.2019
comment
@mmelsen Я поболтала с Али. Думаю, он нашел решение. Он опубликует это, как только мы в этом убедимся.   -  person Milad    schedule 27.04.2019


Ответы (3)


Я столкнулся с той же проблемой, что и в ситуации, в которой я оказался, при подключении к защищенному реестру схем, размещенному на сервере aiven и защищенному базовой аутентификацией. Чтобы заставить его работать, мне пришлось настроить следующие свойства:

spring.kafka.properties.schema.registry.url=https://***.aiven***.com:port
spring.kafka.properties.basic.auth.credentials.source=USER_INFO
spring.kafka.properties.basic.auth.user.info=username:password

другие свойства моего связующего:

spring.cloud.stream.binders.input.type=kafka
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.brokers=https://***.aiven***.com:port <-- different from the before mentioned port
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.security.protocol=SSL
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.location=truststore.jks
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.truststore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.type=PKCS12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.location=clientkeystore.p12
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.keystore.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.ssl.key.password=secret
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.binder.configuration.value.deserializer=io.confluent.kafka.serializers.KafkaAvroDeserializer
spring.cloud.stream.binders.input.environment.spring.cloud.stream.kafka.streams.binder.autoCreateTopics=false

на самом деле происходит то, что облачный поток Spring добавит spring.kafka.properties.basic * в DefaultKafkaConsumerFactory, и это добавит конфигурацию в KafkaConsumer. В какой-то момент во время инициализации Spring kafka создается CachedSchemaRegistryClient, которому предоставляются эти свойства. Этот Клиент содержит метод под названием configureRestService, который проверяет, содержит ли карта свойств «basic.auth.credentials.source». Поскольку мы предоставляем это через spring.kafka.properties, он найдет это свойство и позаботится о создании соответствующих заголовков при доступе к конечной точке реестра схемы.

надеюсь, это сработает и для вас.

Я использую весеннюю облачную версию Greenwich.SR1, spring-boot-starter 2.1.4.RELEASE, avro-версию 1.8.2 и confluent.version 5.2.1

person mmelsen    schedule 01.05.2019
comment
если вы используете это, можете ли вы также сказать мне, как я могу заставить эту схему извлекать из реестра вместо использования локальной? - person tsar2512; 06.12.2020

Конфигурация связующего учитывает только хорошо известные потребительские и производственные свойства.

Вы можете установить произвольные свойства на уровне привязки.

spring.cloud.stream.kafka.binding.<binding>.consumer.configuration.basic.auth...
person Gary Russell    schedule 16.04.2019
comment
переместил параметры в spring.cloud.stream.kafka.bindings.output.producer.configuration, но ошибка + stacktrace не изменилась - person Milad; 17.04.2019

Поскольку Aiven использует SSL для протокола безопасности Kafka, для аутентификации необходимо использовать сертификаты.

Вы можете подписаться на эту страницу Чтобы понять, как это работает. Вкратце, вам нужно выполнить следующую команду для генерации сертификатов и их импорта:

openssl pkcs12 -export -inkey service.key -in service.cert -out client.keystore.p12 -name service_key
keytool -import -file ca.pem -alias CA -keystore client.truststore.jks

Затем вы можете использовать следующие свойства, чтобы использовать сертификаты:

spring.cloud.stream.kafka.streams.binder:
  configuration:
    security.protocol: SSL
    ssl.truststore.location: client.truststore.jks
    ssl.truststore.password: secret
    ssl.keystore.type: PKCS12
    ssl.keystore.location: client.keystore.p12
    ssl.keystore.password: secret
    ssl.key.password: secret
    key.serializer: org.apache.kafka.common.serialization.StringSerializer
    value.serializer: org.apache.kafka.common.serialization.StringSerializer
person Ali    schedule 16.04.2019
comment
это отлично работает для самого Kafka, но не помогает с сервером реестра схемы - person Milad; 17.04.2019