Consumer_failed_message в потоке kafka: записи не отправлены из темы

У меня есть поток, в котором из мэйнфрейма IBM IIDR я отправляю записи в тему Kafka. value_format сообщения, приходящего в тему Kafka, - это AVRO, и ключ также в формате AVRO. Записи пихаются в тему Кафки. У меня есть поток, связанный с этой темой. Но записи в поток не передаются. Пример темы test_iidr -

rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

Формат value_format в потоке - AVRO, и все имена столбцов проверяются.

Запрос на создание потока -

CREATE STREAM test_iidr (
  col1 STRING, 
  col2 DECIMAL(2,0),
  col3 DECIMAL(1,0),
  iidr_tran_type STRING,
  iidr_a_ccid STRING,
  iidr_a_user STRING,
  iidr_src_upd_ts STRING,
  iidr_a_member STRING) 
WITH (KAFKA_TOPIC='test_iidr', PARTITIONS=1, REPLICAS=3, VALUE_FORMAT='AVRO');

Не удается загрузить в поток из темы, поскольку KEY не упоминается в заявлении WITH? В реестре схемы зарегистрированы субъекты test_iidr-value и test_iidr-key.

key.converter и value.converter в докере Kafka-connect установлены как - org.apache.kafka.connect.json.JsonConverter. Создает ли это JsonConverter эту проблему?

Я создал совершенно другой конвейер с другим потоком и вставил те же данные вручную с помощью операторов insert into. Это сработало. Не работает только поток IIDR и записи в поток не попадают из темы.

Я использую Confluent kafka версии 5.5.0.


person Sanjay Nayak    schedule 30.05.2020    source источник
comment
вопрос решен. Похоже, произошла ошибка десериализации из-за DECIMAL и INT. Источник отправлял значения INT, и у нас был DECIMAL в качестве типа данных для них.   -  person Sanjay Nayak    schedule 01.06.2020


Ответы (1)


JsonConverter в конфигурации подключения вполне может преобразовывать ваши данные Avro в JSON.

Чтобы определить форматы сериализации ключей и значений, вы можете использовать команду PRINT (которую, как я вижу, вы уже выполнили). PRINT при запуске выведет форматы ключей и значений. Например:

ksql> PRINT some_topic FROM BEGINNING LIMIT 1;
Key format: JSON or KAFKA_STRING
Value format: JSON or KAFKA_STRING
rowtime: 5/30/20 7:06:34 PM UTC, key: {"col1": "A", "col2": 1}, value: {"col1": "A", "col2": 11, "col3": 2, "iidr_tran_type": "QQ", "iidr_a_ccid": "0", "iidr_a_user": " ", "iidr_src_upd_ts": "2020-05-30 07:06:33.262931000", "iidr_a_member": " "}

Итак, первое, что нужно проверить, - это форматы, выводимые PRINT для ключа и значения, а затем соответствующим образом обновить оператор CREATE.

Обратите внимание, что ksqlDB еще не поддерживает ключи Avro / Json, поэтому вам может потребоваться / потребуется перераспределить данные, см. https: / /docs.ksqldb.io/en/latest/developer-guide/syntax-reference/#what-to-do-if-your-key-is-not-set-or-is-in-a-different-format < / а>

Дополнительное примечание: если схема для значения хранится в реестре схем, вам не нужно определять столбцы в операторе CREATE, поскольку ksqlDB загрузит столбцы из реестра схемы.

Дополнительное примечание: вам не нужно PARTITIONS=1, REPLICAS=3 в предложении WITH для существующих тем, только если вы хотите, чтобы ksqlDB создал тему для вас.

person Andrew Coates    schedule 02.06.2020
comment
Спасибо за предложение. Я внес изменения. Проблема была в типе данных DECIMAL. источник отправлял как INTEGER вместо DECIMAL, а поток Kafka имел DECIMAL в схеме. Мы обновили это до INT. Это сработало - person Sanjay Nayak; 02.06.2020
comment
Отличный материал. Рад, что у тебя все получилось. Вы можете отметить вопрос как отвеченный? - person Andrew Coates; 03.06.2020