Ошибка при записи в HDFS с помощью Kafka HDFS Connect

Я пытаюсь записать данные в формате avro из моего кода Java в Kafka в HDFS с помощью соединителя kafka HDFS, и у меня возникают некоторые проблемы. Когда я использую простую схему и данные, представленные на веб-сайте объединенной платформы, я могу записывать данные в HDFS, но когда я пытаюсь использовать сложную схему avro, я получаю эту ошибку в журналах соединителя HDFS:

ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142)
org.apache.kafka.connect.errors.DataException: Did not find matching union field for data: PROD
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:973)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981)
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:782)
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Я использую конфлюентную платформу 3.0.0

Мой код Java:

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class);
props.put("schema.registry.url", <url>);
// Set any other properties
KafkaProducer producer = new KafkaProducer(props);

Schema schema = new Schema.Parser().parse(new FileInputStream("avsc/schema.avsc"));
DatumReader<Object> reader = new GenericDatumReader<Object>(schema);

InputStream input = new FileInputStream("json/data.json");
DataInputStream din = new DataInputStream(input);
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din);

Object datum = null;
while (true) {
    try {
        datum = reader.read(null, decoder);
    } catch (EOFException e) {
        break;
    }
}

ProducerRecord<Object, Object> message = new ProducerRecord<Object, Object>(topic, datum);
producer.send(message);
producer.close();

Схема (создается из файла avdl):

{
  "type" : "record",
  "name" : "RiskMeasureEvent",
  "namespace" : "risk",
  "fields" : [ {
    "name" : "info",
    "type" : {
      "type" : "record",
      "name" : "RiskMeasureInfo",
      "fields" : [ {
        "name" : "source",
        "type" : {
          "type" : "record",
          "name" : "Source",
          "fields" : [ {
            "name" : "app",
            "type" : {
              "type" : "record",
              "name" : "Application",
              "fields" : [ {
                "name" : "csi_id",
                "type" : "string"
              }, {
                "name" : "name",
                "type" : "string"
              } ]
            }
          }, {
            "name" : "env",
            "type" : {
              "type" : "record",
              "name" : "Environment",
              "fields" : [ {
                "name" : "value",
                "type" : [ {
                  "type" : "enum",
                  "name" : "EnvironmentConstants",
                  "symbols" : [ "DEV", "UAT", "PROD" ]
                }, "string" ]
              } ]
            }
          }, ...

Файл json:

{
  "info": {
    "source": {
      "app": {
        "csi_id": "123",
        "name": "ABC"
      },
      "env": {
        "value": {
          "risk.EnvironmentConstants": "PROD"
        }
      }, ...

Кажется, это проблема со схемой, но я не могу определить проблему.


person iiSGii    schedule 28.07.2016    source источник


Ответы (1)


Я инженер Confluent. Это ошибка в том, как Avro Converter обрабатывает схему union, которая у вас есть для env. Я создал issue-393 для решения этой проблемы. Я также составил запрос на вытягивание с исправлением. Это должно быть скоро объединено.

J

person Jeremy Custenborder    schedule 01.08.2016
comment
Привет, Джереми, спасибо за исправление. Я скачал последний код реестра схем из вашего филиала. Поскольку он еще не включен в пакет confluent, я загрузил код для apache kafka и kafka-hdfs-connect и собрал их локально. При попытке запустить коннектор hdfs возникает ошибка при попытке загрузить файл AvroConverter (который находится в реестре схемы). Могу я узнать, как настроить коннектор, чтобы он мог найти эту банку? - person iiSGii; 03.08.2016