Десериализовать Avro Spark

Я отправляю поток данных в Azure EventHub с помощью следующего кода, использующего Microsoft.Hadoop.Avro .. этот код запускается каждые 5 секунд и просто вставляет те же два сериализованных элемента Avro ????????:

  var strSchema = File.ReadAllText("schema.json");
  var avroSerializer = AvroSerializer.CreateGeneric(strSchema);
  var rootSchema = avroSerializer.WriterSchema as RecordSchema;

  var itemList = new List<AvroRecord>();

  dynamic record_one = new AvroRecord(rootSchema);
  record_one.FirstName = "Some";
  record_one.LastName = "Guy";
  itemList.Add(record_one);

  dynamic record_two = new AvroRecord(rootSchema);
  record_two.FirstName = "A.";
  record_two.LastName = "Person";
  itemList.Add(record_two);

  using (var buffer = new MemoryStream())
  {
      using (var writer = AvroContainer.CreateGenericWriter(strSchema, buffer, Codec.Null))
      {
          using (var streamWriter = new SequentialWriter<object>(writer, itemList.Count))
          {
              foreach (var item in itemList)
              {
                  streamWriter.Write(item);
              }
          }
      }

      eventHubClient.SendAsync(new EventData(buffer.ToArray()));
  }

Схема, использованная здесь, снова v. Проста:

{
  "type": "record",
  "name": "User",
  "namespace": "SerDes",
  "fields": [
    {
      "name": "FirstName",
      "type": "string"
    },
    {
      "name": "LastName",
      "type": "string"
    }
  ]
}

Я убедился, что все в порядке, с помощью простого представления в Azure Stream Analytics на портале:

Снимок экрана Stream Analytics

Пока все хорошо, но я не могу, хоть убей, правильно десериализовать это в Databricks, используя команду from_avro() в Scala ..

Загрузить (точно такую ​​же) схему в виде строки:

val sampleJsonSchema = dbutils.fs.head("/mnt/schemas/schema.json")

Настроить EventHub

val connectionString = ConnectionStringBuilder("<CONNECTION_STRING>")
  .setEventHubName("<NAME_OF_EVENT_HUB>")
  .build

val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream)
val eventhubs = spark.readStream.format("eventhubs").options(eventHubsConf.toMap).load()

Прочтите данные ..

// this works, and i can see the serialised data
display(eventhubs.select($"body"))

// this fails, and with an exception: org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
display(eventhubs.select(from_avro($"body", sampleJsonSchema)))

Итак, по сути, то, что здесь происходит ... я сериализую данные с той же схемой, что и десериализация, но что-то искажено ... документация по этому поводу невероятно скудна (очень-очень мало на веб-сайте Microsoft).


person m1nkeh    schedule 07.11.2019    source источник


Ответы (1)


Проблема

После дополнительного расследования (и в основном с помощью этого article) Я обнаружил, в чем была моя проблема: from_avro(data: Column, jsonFormatSchema: String) ожидает формат схемы искры, а не формат схемы avro. Документация по этому поводу не очень ясна.

Решение 1

Databricks предоставляет удобный метод from_avro(column: Column, subject: String, schemaRegistryUrl: String)), который извлекает необходимую схему avro из реестра схем kafka и автоматически преобразует ее в правильный формат.

К сожалению, он недоступен для чистой искры, а также невозможно использовать его без реестра схем kafka.

Решение 2

Используйте преобразование схемы, предоставленное Spark:

// define avro deserializer
class AvroDeserializer() extends AbstractKafkaAvroDeserializer {
  override def deserialize(payload: Array[Byte]): String = {
    val genericRecord = this.deserialize(payload).asInstanceOf[GenericRecord]
    genericRecord.toString
  }
}

// create deserializer instance
val deserializer = new AvroDeserializer()

// register deserializer
spark.udf.register("deserialize_avro", (bytes: Array[Byte]) =>
  deserializer.deserialize(bytes)
)

// get avro schema from registry (but I presume that it should also work with schema read from a local file)
val registryClient = new CachedSchemaRegistryClient(kafkaSchemaRegistryUrl, 128)
val avroSchema = registryClient.getLatestSchemaMetadata(topic + "-value").getSchema
val sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

// consume data 
df.selectExpr("deserialize_avro(value) as data")
  .select(from_json(col("data"), sparkSchema.dataType).as("data"))
  .select("data.*")
person noscreenname    schedule 03.07.2020
comment
Итак, я предполагаю, что вы действительно используете реестр схемы? по воспоминаниям (сейчас это довольно старый вопрос) я не думаю, что у меня был реестр схемы .. это может означать, что вы, возможно, используете apache kafka? Я дам ему еще один поворот - все еще где-то есть код ???? Я также дважды проверю свою версию Spark - person m1nkeh; 03.07.2020
comment
p.s. я писал свои вещи в PySpark ???? - person m1nkeh; 03.07.2020
comment
from_avro с прямой поддержкой схемы реестр предназначен только для Databricks, насколько я помню ... в наличии Spark требуется схема JSON, которую вы можете получить из реестра через HTTP. - person Alex Ott; 04.07.2020
comment
Да, вы правы, это работает в блокноте с данными, но не в чистом Spark: / - person noscreenname; 04.07.2020
comment
Редактировать с более подробной информацией - person noscreenname; 06.07.2020