KafkaAvroSerializer с несколькими URL-адресами реестра avro

у нас есть KafkaAvroSerde настроенный с несколькими URL-адресами avroregistry. В какой-то момент serde получил тайм-аут при попытке зарегистрировать схему на 1 URL-адресе, но поскольку он выдал исключение ввода-вывода до потокового приложения, поток потока закрылся. С точки зрения потокового приложения kafka, это противоречит цели поддержки нескольких URL-адресов при создании серверов avro, поскольку исключение времени выполнения, всплывающее в стеке API DSL, закроет поток потока. пара вопросов:

  1. Есть ли хороший способ справиться с этим?
  2. Нужно ли принудительно выполнять повторную попытку в логике приложения (что может быть сложно, когда вы просто материализуете тему в хранилище)?
  3. В противном случае существует ли оболочка avroserde, которая
    могла бы повторить попытку с фактическими URL-адресами configure avroRegistry?
  4. При материализации в локальном хранилище rocksDB есть ли добавленное
    значение для регистрации схемы в реестре или нам следует настроить auto.register.schemas на false?

>

Exception in thread "mediafirst-npvr-adapter-program-mapping-mtrl02nsbe02.pf.spop.ca-f5e097bd-ff1b-42da-9f7d-2ab9fa5d2b70-GlobalStreamThread" org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"ProgramMapp
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Register operation timed out; error code: 50002; error code: 50002
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:191)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:218)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:307)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:299)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:100)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:79)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:178)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:68)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore$1.innerValue(MeteredKeyValueBytesStore.java:57)
at org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:199)
at org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:121)
at com.bell.cts.commons.kafka.store.custom.CustomStoreProcessor.process(CustomStoreProcessor.java:37)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl.forward(GlobalProcessorContextImpl.java:52)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask.update(GlobalStateUpdateTask.java:87)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread$StateConsumer.pollAndUpdate(GlobalStreamThread.java:239)
at org.apache.kafka.streams.processor.internals.GlobalStreamThread.run(GlobalStreamThread.java:282)

person Frederic Tardif    schedule 02.11.2018    source источник


Ответы (1)


С точки зрения потокового приложения kafka, это противоречит цели поддержки нескольких URL-адресов при создании серверов avro, поскольку исключение времени выполнения, всплывающее в стеке API DSL, закроет поток потока.

Я не согласен здесь: с точки зрения Kafka Streams сериализация не удалась, и поэтому приложение необходимо закрыть. Обратите внимание, что Kafka Streams не зависит от используемого вами Serde и, следовательно, не знает, что ваш Serde взаимодействует с реестром схемы и что он может повторить попытку.

Таким образом, Serde отвечает за внутреннюю обработку повторных попыток. Я не знаю обертки, которая делает это, но это не должно быть слишком сложно создать самостоятельно. Я создам внутренний тикет для отслеживания этого запроса функции. Я думаю, что имеет смысл добавить это для нестандартного опыта.

Для RocksDB: все записи, записанные в RocksDB, также записываются в тему журнала изменений. Таким образом, чтобы позволить Kafka Streams читать эти данные для восстановления состояния после ошибки, вам необходимо зарегистрировать схемы.

person Matthias J. Sax    schedule 02.11.2018
comment
ваше право;) Я не должен говорить с точки зрения приложения kafka stream, а скорее с точки зрения пользователя, использующего поток Kafka в сочетании с avro serdes;). Меня просто беспокоит возможная единственная точка отказа, которую реестр avro может привести к распределенной системе kafka, использующей avro. - person Frederic Tardif; 06.11.2018
comment
Это справедливое беспокойство, и я согласен, что его следует улучшить. Однако на правом слое. Это не проблема Kafka Streams, а проблема AvroSerde. - person Matthias J. Sax; 06.11.2018
comment
Для записей здесь находится внутренний билет github.com/confluentinc /схема-реестр/вопросы/928 - person Jérémie Bolduc; 04.03.2020