Невозможно подключиться из задания потока данных к реестру схемы, когда реестр схемы требует аутентификации клиента TLS

Я разрабатываю задание GCP Cloud Dataflow, использующее брокер Kafka и реестр схем. Нашему брокеру Kafka и реестру схем требуется сертификат клиента TLS. И я столкнулся с проблемой подключения к реестру схем при развертывании. Любые предложения приветствуются.

Вот что я делаю для работы с потоком данных. Я создаю Consumer Properties для конфигураций TLS.

props.put("security.protocol", "SSL");
props.put("ssl.truststore.password", "aaa");
props.put("ssl.keystore.password", "bbb");
props.put("ssl.key.password", "ccc"));
props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);

И обновите Consumer Properties с помощью updateConsumerProperties.

Pipeline p = Pipeline.create(options)
...
.updateConsumerProperties(properties)
... 

Как следует из этого ответа stackoverflow, я также загружаю keyStore и trustStore в локальный каталог и указываю расположение trustStore / keyStore в ConsumerProperties в ConsumerFactory.

Truststore и Google Cloud Dataflow

Pipeline p = Pipeline.create(options)
 ...
 .withConsumerFactoryFn(new MyConsumerFactory(...))
 ...

В ConsumerFactory:

public Consumer<byte[], byte[]> apply(Map<String, Object> config)  {
  // download keyStore and trustStore from GCS bucket 
  config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
  config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
  new KafkaConsumer<byte[], byte[]>(config);
}

С этим кодом мне удалось выполнить развертывание, но задание Dataflow получило ошибку проверки сертификата сервера TLS.

Caused by: sun.security.validator.ValidatorException: PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target
        sun.security.validator.PKIXValidator.doBuild(PKIXValidator.java:387)
        sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:292)
        sun.security.validator.Validator.validate(Validator.java:260)
        sun.net.www.protocol.https.HttpsClient.afterConnect(HttpsClient.java:559)
        sun.net.www.protocol.https.AbstractDelegateHttpsURLConnection.connect(AbstractDelegateHttpsURLConnection.java:185)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1513)
        sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1441)
        java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:480)
        sun.net.www.protocol.https.HttpsURLConnectionImpl.getResponseCode(HttpsURLConnectionImpl.java:338)
        io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:208)
        io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:252)
        io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:482)
        io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:475)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:151)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getBySubjectAndId(CachedSchemaRegistryClient.java:230)
        io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getById(CachedSchemaRegistryClient.java:209)
        io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:116)
        io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:88)
        org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:14)
        org.fastretailing.rfid.store.siv.EPCTransactionKafkaAvroDeserializer.deserialize(EPCTransactionKafkaAvroDeserializer.scala:7)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:234)
        org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:176)
        org.apache.beam.runners.dataflow.worker.WorkerCustomSources$UnboundedReaderIterator.start(WorkerCustomSources.java:779)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation$SynchronizedReaderIterator.start(ReadOperation.java:361)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:194)
        org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:159)
        org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:76)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.process(StreamingDataflowWorker.java:1228)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker.access$1000(StreamingDataflowWorker.java:143)
        org.apache.beam.runners.dataflow.worker.StreamingDataflowWorker$6.run(StreamingDataflowWorker.java:967)
        java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        java.lang.Thread.run(Thread.java:745)

Затем я обнаружил, что клиент реестра схемы загружает конфигурации TLS из системного свойства. https://github.com/confluentinc/schema-registry/issues/943

Я протестировал Kafka Consumer с той же конфигурацией и подтвердил, что он работает нормально.

props.put("schema.registry.url", "https://host:port")
props.put("specific.avro.reader", true);
props.put("ssl.truststore.location", System.getProperty("javax.net.ssl.trustStore"));
props.put("ssl.truststore.password", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.location", System.getProperty("javax.net.ssl.keyStore"));
props.put("ssl.keystore.password", System.getProperty("javax.net.ssl.keyStorePassword"));
props.put("ssl.key.password", System.getProperty("javax.net.ssl.key.password"));

Затем я применил тот же подход, что означает применение одинаковых конфигураций TLS к свойствам системы и свойствам потребителя к коду задания Dataflow.

Я указывал пароль в свойствах системы при запуске приложения.

-Djavax.net.ssl.keyStorePassword=aaa \
-Djavax.net.ssl.key.password=bbb \
-Djavax.net.ssl.trustStorePassword=ccc \

Примечание. Я установил системное свойство для trustStore и keyStore в Consumer Factory, поскольку эти файлы загружаются в локальный временный каталог.

config.put("ssl.truststore.location", (Object)localTrustStoreFilePath)
config.put("ssl.keystore.location", (Object)localKeyStoreFilePath)
System.setProperty("javax.net.ssl.trustStore", localTrustStoreFilePath)
System.setProperty("javax.net.ssl.keyStore", localKeyStoreFilePath)

но даже развертывание не удалось с ошибкой тайм-аута.

Exception in thread "main" java.lang.RuntimeException: Failed to construct instance from factory method DataflowRunner#fromOptions(interface org.apache.beam.sdk.options.PipelineOptions)
        at org.apache.beam.sdk.util.InstanceBuilder.buildFromMethod(InstanceBuilder.java:224)
...
Caused by: java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
Caused by: java.lang.IllegalArgumentException: DataflowRunner requires gcpTempLocation, but failed to retrieve a value from PipelineOptions
        at org.apache.beam.runners.dataflow.DataflowRunner.fromOptions(DataflowRunner.java:246)
Caused by: java.lang.IllegalArgumentException: Error constructing default value for gcpTempLocation: tempLocation is not a valid GCS path, gs://dev-k8s-rfid-store-dataflow/rfid-store-siv-epc-transactions-to-bq/tmp. 
        at org.apache.beam.sdk.extensions.gcp.options.GcpOptions$GcpTempLocationFactory.create(GcpOptions.java:255)
...
Caused by: java.lang.RuntimeException: Unable to verify that GCS bucket gs://dev-k8s-rfid-store-dataflow exists.
        at org.apache.beam.sdk.extensions.gcp.storage.GcsPathValidator.verifyPathIsAccessible(GcsPathValidator.java:86)
...
Caused by: java.io.IOException: Error getting access token for service account: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at com.google.auth.oauth2.ServiceAccountCredentials.refreshAccessToken(ServiceAccountCredentials.java:401)
...
Caused by: java.net.SocketException: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at javax.net.ssl.DefaultSSLSocketFactory.throwException(SSLSocketFactory.java:248)
...
Caused by: java.security.NoSuchAlgorithmException: Error constructing implementation (algorithm: Default, provider: SunJSSE, class: sun.security.ssl.SSLContextImpl$DefaultSSLContext)
        at java.security.Provider$Service.newInstance(Provider.java:1617)
...
Caused by: java.io.IOException: Keystore was tampered with, or password was incorrect
    at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:780)
Caused by: java.security.UnrecoverableKeyException: Password verification failed
    at sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:778)

Я что-то упускаю?


person Yohei Onishi    schedule 08.05.2019    source источник
comment
получал ту же ошибку даже при прямом указании паролей. Таким образом, System.setProperty в Consumer Factory может работать не так, как ожидалось.   -  person Yohei Onishi    schedule 08.05.2019


Ответы (2)


В ConsumerFactoryFn вам нужно скопировать сертификат из некоторого места (например, GCS) в локальный путь к файлу на машине.

В Truststore и Google Cloud Dataflow ConsumerFnFactory, который пользователь write имеет этот фрагмент кода, который извлекает хранилище доверенных сертификатов из GCS:

            Storage storage = StorageOptions.newBuilder()
                    .setProjectId("prj-id-of-your-bucket")
                    .setCredentials(GoogleCredentials.getApplicationDefault())
                    .build()
                    .getService();
            Blob blob = storage.get("your-bucket-name", "pth.to.your.kafka.client.truststore.jks");
            ReadChannel readChannel = blob.reader();
            FileOutputStream fileOuputStream;
            fileOuputStream = new FileOutputStream("/tmp/kafka.client.truststore.jks"); //path where the jks file will be stored
            fileOuputStream.getChannel().transferFrom(readChannel, 0, Long.MAX_VALUE);
            fileOuputStream.close();
            File f = new File("/tmp/kafka.client.truststore.jks"); //assuring the store file exists
            if (f.exists())
            {
                LOG.debug("key exists");

            }
            else
            {
                LOG.error("key does not exist");

            }

Вам нужно будет сделать что-то подобное (это не обязательно должен быть GCS, но он должен быть доступен для всех виртуальных машин, выполняющих ваш конвейер в Google Cloud Dataflow).

person Lukasz Cwik    schedule 08.05.2019
comment
Спасибо, но я уже это сделал. похоже, что луч Apache не поддерживает реестр схем. Я отправлю его после того, как получу ответ от службы поддержки GCP. - person Yohei Onishi; 09.05.2019

Я получил ответ от службы поддержки GCP. Похоже, что Apache Beam не поддерживает реестр схем.

Здравствуйте, специалист по Dataflow перезвонил мне. Сейчас я раскрою то, что они мне сказали.

Ответ на ваш вопрос - нет, Apache Beam не поддерживает реестр схем. Однако они сказали мне, что вы можете реализовать вызовы Schema Registry самостоятельно, поскольку Beam потребляет только необработанные сообщения, и ответственность за то, чтобы делать все, что им нужно с данными, лежит на пользователе.

Это основано на нашем понимании случая, когда вы хотите публиковать сообщения в Kafka, и DF использует эти сообщения, анализируя их на основе схемы из реестра.

Я надеюсь, что эта информация может быть вам полезна, дайте мне знать, если я смогу вам чем-то помочь.

Но задание Dataflow по-прежнему может получать двоичное сообщение в формате Avro. Таким образом, вы внутренне вызываете REST API реестра схемы следующим образом. https://stackoverflow.com/a/55917157

person Yohei Onishi    schedule 13.05.2019