Преобразование org.apache.avro.generic.GenericRecord в org.apache.spark.sql.Row

У меня есть список org.apache.avro.generic.GenericRecord, avro schemaиспользуя это, нам нужно создать dataframe с помощью SQLContext API, для создания dataframe нужно RDD из org.apache.spark.sql.Row и avro schema. Предпосылкой для создания DF является то, что у нас должен быть RDD org.apache.spark.sql.Row, и это может быть достигнуто с помощью приведенного ниже кода, но почему-то он не работает и выдает ошибку, пример кода.

 1. Convert GenericRecord to Row
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
    import org.apache.avro.Schema
    import org.apache.spark.sql.types.StructType
    def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
    {
      val fields = avroSchema.getFields
      var rows = new Seq[Row]
      for (avroRecord <- genericRecords) {
        var avroFieldsSeq = Seq[Any]();
        for (i <- 0 to fields.size - 1) {
          avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
        }
        val avroFieldArr = avroFieldsSeq.toArray
        val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
        rows = rows :+ genericRow
      }
      return rows;
    }

2. Convert `Avro schema` to `Structtype`
   Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType

3. Create `Dataframe` using `SQLContext`
   val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
   val rowRdd = sc.parallelize(rowSeq, 1)
   val finalDF =sqlContext.createDataFrame(rowRDD,structType)

Но выдает ошибку при создании DataFrame. Может кто-нибудь, пожалуйста, помогите мне, что не так в приведенном выше коде. Кроме того, если у кого-то другая логика конвертации и создания dataframe.

Всякий раз, когда я буду вызывать какое-либо действие в Dataframe, он будет выполнять DAG и пытаться создать объект DF, но в этом случае он терпит неудачу с приведенным ниже исключением, поскольку

 ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
 Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
                        at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
                        at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)

После этого я пытаюсь указать правильную версию jar в параметре jar отправки spark и с другим параметром как --conf spark.driver.userClassPathFirst=true, но теперь он не работает с MapR как

ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
                    at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
                    at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
                    at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
                    at java.lang.Class.forName0(Native Method)

Мы используем дистрибутив MapR, и после изменения пути к классу в spark-submit происходит сбой с указанным выше исключением.

Может кто-нибудь, пожалуйста, помогите здесь или мне нужно преобразовать Avro GenericRecord в Spark Row, чтобы я мог создать с ним Dataframe, пожалуйста, помогите
Спасибо.


person Sagar balai    schedule 13.06.2017    source источник
comment
Какая точная ошибка? и, пожалуйста, обновите вопрос, указав образец genericRecords, avroSchema.   -  person Ramesh Maharjan    schedule 13.06.2017
comment
@RameshMaharjan Трассировка стека драйвера: org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 0 на этапе 0.0 завершилась неудачно 4 раза, последний сбой: потеряна задача 0.3 на этапе 0.0 (TID 3, hdpoc-c01-r03-01 , исполнитель 2): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; локальный класс несовместим: потоковое classdesc serialVersionUID = 2, локальный класс serialVersionUID = 1 в java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) в java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)   -  person Sagar balai    schedule 14.06.2017
comment
ошибка выглядит как ошибка версии между источником потоковых данных и кодом преобразования в вашем локальном. Вам придется использовать ту же версию пакета FastDateFormat, что и источник. И, пожалуйста, обновите ошибку в вопросе, чтобы другие тоже могли вам помочь.   -  person Ramesh Maharjan    schedule 14.06.2017
comment
@RameshMaharjan Я обновил все в вопросе. Есть ли другой способ преобразовать строку GenericRecord в строку Spark?   -  person Sagar balai    schedule 15.06.2017


Ответы (3)


Может быть, это поможет кому-то, кто придет в игру чуть позже.

Поскольку spark-avro устарел и теперь интегрирован в Spark, это можно сделать другим способом.

import org.apache.spark.sql.avro._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.encoders.RowEncoder

...

val avroSchema = data.head.getSchema
val sparkTypes = SchemaConverters.toSqlType(avroSchema).dataType.asInstanceOf[StructType]
val converter = new AvroDeserializer(avroSchema, sparkTypes)
val enconder = RowEncoder.apply(sparkTypes).resolveAndBind()

val rows = data.map { record =>
    enconder.fromRow(converter.deserialize(record).asInstanceOf[InternalRow])
}

val df = sparkSession.sqlContext.createDataFrame(sparkSession.sparkContext.parallelize(rows), sparkTypes)
person Grbinho    schedule 31.01.2020
comment
как мы можем сделать это зажигать версию 3.0.1 - person Aman Gupta; 03.12.2020
comment
Теперь класс org.apache.spark.sql.avro.AvroDeserializer является закрытым в spark 3, @Grbinho, не могли бы вы предложить другой способ сделать то же самое в spark 3. - person Aman Gupta; 14.12.2020

При создании кадра данных из RDD [GenericRecord] есть несколько шагов.

  1. Сначала нужно преобразовать org.apache.avro.generic.GenericRecord в org.apache.spark.sql.Row

Используйте com.databricks.spark.avro.SchemaConverters.createConverterToSQL (sourceAvroSchema: Schema, targetSqlType: DataType)

это частный метод в версии spark-avro 3.2. Если у нас такой же или меньше 3.2, скопируйте этот метод в свой собственный класс util и используйте его, иначе используйте его напрямую.

  1. Создайте Dataframe из коллекции Row (rowSeq).

val rdd = ssc.sparkContext.parallelize(rowSeq,numParition) val dataframe = sparkSession.createDataFrame(rowRDD, schemaType)

Это решает мою проблему.

person Sagar balai    schedule 28.03.2018

Надеюсь, это поможет. В первой части вы можете узнать, как преобразовать GenericRecord в Row.

Как преобразовать RDD[GenericRecord] в фрейм данных в scala?

person hlagos    schedule 14.11.2017
comment
На самом деле я пытался объяснить в ссылке ранее, но не получал правильный фрейм данных. Сначала нам нужно изменить RDD [GenericRecord] на RDD [Row], а затем вы можете создать фрейм данных. - person Sagar balai; 28.03.2018
comment
см. ответ ниже. - person Sagar balai; 28.03.2018