Spark sql saveAsTable создает режим добавления таблицы, если новый столбец добавляется в схему avro

Я использую Spark sql DataSet для записи данных в куст. Он отлично работает, если схема такая же, но если я изменю схему avro, добавив новый столбец между ними, она покажет ошибку (схема предоставляется из реестра схем)

Error running job streaming job 1519289340000 ms.0 org.apache.spark.sql.AnalysisException: The column number of the existing table default.sample(struct<collection_timestamp:bigint,managed_object_id:string,managed_object_type:string,if_admin_status:string,date:string,hour:int,quarter:bigint>) doesn't match the data schema(struct<collection_timestamp:bigint,managed_object_id:string,if_oper_status:string,managed_object_type:string,if_admin_status:string,date:string,hour:int,quarter:bigint>);

if_oper_status нужно добавить новый столбец. Пожалуйста, предложите.

StructType struct = convertSchemaToStructType(SchemaRegstryClient.getLatestSchema("simple"));
        Dataset<Row> dataset = getSparkInstance().createDataFrame(newRDD, struct);


        dataset=dataset.withColumn("date",functions.date_format(functions.current_date(), "dd-MM-yyyy"));
        dataset=dataset.withColumn("hour",functions.hour(functions.current_timestamp()));
        dataset=dataset.withColumn("quarter",functions.floor(functions.minute(functions.current_timestamp()).divide(5)));


        dataset
        .coalesce(1)
        .write().mode(SaveMode.Append)
        .option("charset", "UTF8")
        .partitionBy("date","hour","quarter")
        .option("checkpointLocation", "/tmp/checkpoint")
        .saveAsTable("sample");

person Sumit G    schedule 22.02.2018    source источник


Ответы (2)


Мне удалось решить эту проблему, сохранив схему из реестра в файл и указав путь к файлу avro.schema.url =, как показано ниже.

Примечание. Это необходимо сделать до saveAsTable("sample")

dataset.sqlContext().sql("CREATE EXTERNAL TABLE IF NOT EXISTS sample PARTITIONED BY (dt STRING, hour STRING, quarter STRING ) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.avro.AvroSerDe' STORED as INPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat' LOCATION 'hdfs://localhost:9000/user/root/sample'  TBLPROPERTIES ('avro.schema.url'='file://"+file.getAbsolutePath()+"')");
person Sumit G    schedule 01.03.2018

Пожалуйста, перейдите по ссылке: https://github.com/databricks/spark-avro/pull/155 . Для истории коммитов в выпуск 3.1 добавлен PR для поддержки меняющейся схемы Avro. Какую версию spark-avro вы используете в своем коде?

person Vinoth Chinnasamy    schedule 22.02.2018
comment
Версия 4.0.0 ---------- ‹groupId›com.databricks‹/groupId› ‹artifactId›spark-avro_2.11‹/artifactId› ‹version›4.0.0‹/version› - person Sumit G; 22.02.2018
comment
Только что заметил, что вы используете schemaRegistry для получения схемы Avro. Я предположил, что вы читаете файл Avro, используя spark sql. Я думаю, вы читаете файл Avro из темы Kafka со схемой Registry. Я не совсем уверен, поддерживается ли эволюция схемы при использовании Kafka в качестве источника. Я немного покопаюсь в этом и вернусь к вам. - person Vinoth Chinnasamy; 22.02.2018
comment
Вы получили какую-либо подсказку? - person Sumit G; 26.02.2018
comment
Нет, еще нет, извините. :( - person Vinoth Chinnasamy; 27.02.2018