Я использую Spring Cloud Stream вместе с реестром схем Aiven, который использует реестр схем confluent. Реестр схем Aiven защищен паролем. На основе этих инструкций. эти два параметра конфигурации должны быть установлены для успешного доступа к серверу реестра схемы.
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", "avnadmin:schema-reg-password");
Все в порядке, когда я использую только драйверы kafka vanilla java, но если я использую облачный поток Spring, я не знаю, как ввести эти два параметра. В данный момент я помещаю "basic.auth.user.info"
и "basic.auth.credentials.source"
в "spring.cloud.stream.kafka.binder.configuration"
файла application.yml
.
При этом я получаю "401 Unauthorized"
в строке, где схема хочет зарегистрироваться.
Обновление 1:
Основываясь на предложении Алина, я обновил способ настройки компонента SchemaRegistryClient, чтобы он узнал о контексте SSL.
@Bean
public SchemaRegistryClient schemaRegistryClient(
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint) {
try {
final KeyStore keyStore = KeyStore.getInstance("PKCS12");
keyStore.load(new FileInputStream(
new File("path/to/client.keystore.p12")),
"secret".toCharArray());
final KeyStore trustStore = KeyStore.getInstance("JKS");
trustStore.load(new FileInputStream(
new File("path/to/client.truststore.jks")),
"secret".toCharArray());
TrustStrategy acceptingTrustStrategy = (X509Certificate[] chain, String authType) -> true;
SSLContext sslContext = SSLContextBuilder
.create()
.loadKeyMaterial(keyStore, "secret".toCharArray())
.loadTrustMaterial(trustStore, acceptingTrustStrategy)
.build();
HttpClient httpClient = HttpClients.custom().setSSLContext(sslContext).build();
ClientHttpRequestFactory requestFactory = new HttpComponentsClientHttpRequestFactory(
httpClient);
ConfluentSchemaRegistryClient schemaRegistryClient = new ConfluentSchemaRegistryClient(
new RestTemplate(requestFactory));
schemaRegistryClient.setEndpoint(endpoint);
return schemaRegistryClient;
} catch (Exception ex) {
ex.printStackTrace();
return null;
}
}
Это помогло избавиться от ошибки при запуске приложения и зарегистрировать схему. Однако всякий раз, когда приложение хотело отправить сообщение в Kafka, снова выдавалась новая ошибка. Наконец, это также было исправлено ответом Ммельсена.