Как читать закодированные данные Avro Binary (Base64) в Spark Scala

Я пытаюсь прочитать файл avro, который закодирован в двоичном формате (Base64), и быстро сжатый кот Hadoop в файле avro выглядит так:

Objavro.schema? 
{"type":"record","name":"ConnectDefault","namespace":"xyz.connect.avro","fields": 
[{"name":"service","type":"string"},{"name":"timestamp","type":"long"}, 
{"name":"count","type":"int"},{"name":"encoderKey","type":{"type":"map","values":"string"}}, 
{"name":"schema","type":"string"},{"name":"data","type":"string"}]}>??n]

Мне нужно извлечь и прочитать «схему» и «данные» из вышеуказанного файла. «Схема» связана с «данными», которые имеют несколько полей.

Я пробовал следующие шаги:

1.Чтение бинарного файла

val binaryFilesRDD = sc.binaryFiles("file+0+00724+00731.avro").map { x => ( x._2.toArray) }
binaryFilesRDD: org.apache.spark.rdd.RDD[Array[Byte]] = MapPartitionsRDD[1] at map at 
<console>:24
  1. Преобразование RDD[Array[Byte]] в Array[Byte]
scala> val newArray = binaryFilesRDD.collect().flatten 
    newArray: Array[Byte] = Array(17, 18, 16, 51, 24, 22, 17, 18, 117, 151, 76, 105, 95, 124....
  1. Вызов следующего метода с использованием newArray (т.е. Array[Byte] ) для получения записей из байтов
    def getGenericRecordfromByte(inputData:Array[Byte], inputDataSchema: Schema): GenericRecord = 
    {
        val datareader = new GenericDatumReader[GenericRecord](inputDataSchema)
        val datadecoder = DecoderFactory.get.binaryDecoder(inputData, null)
        datareader.read(null, datadecoder) 
    }

Но я получаю следующие ошибки.

    scala> val newDataRecords = getGenericRecordfromByte(newArray,inputDataSchema)
    org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -40
  at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
  at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
  at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
  at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:363)
  at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:355)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:157)
  at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:193)
  at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
  at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:142)


Пожалуйста посоветуй


person Vicky    schedule 14.10.2019    source источник


Ответы (2)


Вы запускаете искровую оболочку следующим образом:

spark-shell --packages org.apache.spark:spark-avro_2.11:2.4.4

Или, может быть, так:

spark2-shell --packages org.apache.spark:spark-avro_2.11:2.4.4

Затем вы делаете:

spark.read.format("com.databricks.spark.avro").load("/file/path")
person Vitaly Olegovitch    schedule 14.10.2019

Для версии 2.3.x и более ранних (https://github.com/databricks/spark-avro/blob/branch-4.0/README-for-old-spark-versions.md):

spark-shell --packages com.databricks:spark-avro_2.11:4.0.0 ...

затем в вашем коде:

val avro = spark.read.format("com.databricks.spark.avro").load(/path/)
person Marcela Romero    schedule 05.09.2020