У меня есть список org.apache.avro.generic.GenericRecord
, avro schema
используя это, нам нужно создать dataframe
с помощью SQLContext
API, для создания dataframe
нужно RDD
из org.apache.spark.sql.Row
и avro schema
. Предпосылкой для создания DF является то, что у нас должен быть RDD org.apache.spark.sql.Row, и это может быть достигнуто с помощью приведенного ниже кода, но почему-то он не работает и выдает ошибку, пример кода.
1. Convert GenericRecord to Row
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.avro.Schema
import org.apache.spark.sql.types.StructType
def convertGenericRecordToRow(genericRecords: Seq[GenericRecord], avroSchema: Schema, schemaType: StructType): Seq[Row] =
{
val fields = avroSchema.getFields
var rows = new Seq[Row]
for (avroRecord <- genericRecords) {
var avroFieldsSeq = Seq[Any]();
for (i <- 0 to fields.size - 1) {
avroFieldsSeq = avroFieldsSeq :+avroRecord.get(fields.get(i).name)
}
val avroFieldArr = avroFieldsSeq.toArray
val genericRow = new GenericRowWithSchema(avroFieldArr, schemaType)
rows = rows :+ genericRow
}
return rows;
}
2. Convert `Avro schema` to `Structtype`
Use `com.databricks.spark.avro.SchemaConverters -> toSqlType` function , it will convert avro schema to StructType
3. Create `Dataframe` using `SQLContext`
val rowSeq= convertGenericRecordToRow(genericRecords, avroSchema, schemaType)
val rowRdd = sc.parallelize(rowSeq, 1)
val finalDF =sqlContext.createDataFrame(rowRDD,structType)
Но выдает ошибку при создании DataFrame
. Может кто-нибудь, пожалуйста, помогите мне, что не так в приведенном выше коде. Кроме того, если у кого-то другая логика конвертации и создания dataframe
.
Всякий раз, когда я буду вызывать какое-либо действие в Dataframe, он будет выполнять DAG и пытаться создать объект DF, но в этом случае он терпит неудачу с приведенным ниже исключением, поскольку
ERROR TaskSetManager: Task 0 in stage 0.0 failed 4 times; aborting job
Error :Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, hdpoc-c01-r06-01, executor 1): java.io.InvalidClassException: org.apache.commons.lang3.time.FastDateFormat; local class incompatible: stream classdesc serialVersionUID = 2, local class serialVersionUID = 1
at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622)
После этого я пытаюсь указать правильную версию jar в параметре jar отправки spark и с другим параметром как --conf spark.driver.userClassPathFirst=true, но теперь он не работает с MapR как
ERROR CLDBRpcCommonUtils: Exception during init
java.lang.UnsatisfiedLinkError: com.mapr.security.JNISecurity.SetClusterOption(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;)
at com.mapr.security.JNISecurity.SetClusterOption(Native Method)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.init(CLDBRpcCommonUtils.java:163)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<init>(CLDBRpcCommonUtils.java:73)
at com.mapr.baseutils.cldbutils.CLDBRpcCommonUtils.<clinit>(CLDBRpcCommonUtils.java:63)
at org.apache.hadoop.conf.CoreDefaultProperties.<clinit>(CoreDefaultProperties.java:69)
at java.lang.Class.forName0(Native Method)
Мы используем дистрибутив MapR, и после изменения пути к классу в spark-submit происходит сбой с указанным выше исключением.
Может кто-нибудь, пожалуйста, помогите здесь или мне нужно преобразовать Avro GenericRecord в Spark Row, чтобы я мог создать с ним Dataframe, пожалуйста, помогите
Спасибо.