искровая дельта перезаписывает определенный раздел

Итак, у меня есть фрейм данных, в котором есть столбец file_date. Для данного прогона фрейм данных содержит данные только для одного уникального file_date. Например, в прогоне предположим, что имеется около 100 записей с file_date, равным 2020_01_21.

Я пишу эти данные, используя следующие

(df
 .repartition(1)
 .write
 .format("delta")
 .partitionBy("FILE_DATE")
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .option("replaceWhere","FILE_DATE=" + run_for_file_date)
 .mode("overwrite")
 .save("/mnt/starsdetails/starsGrantedDetails/"))

Мое требование - создавать папку / раздел для каждого FILE_DATE, так как есть большая вероятность, что данные для определенного file_date будут перезапущены, а данные конкретного file_date должны быть перезаписаны. К сожалению, в приведенном выше коде, если я не помещаю опцию «replaceWhere», он просто перезаписывает данные и для других разделов, но если я напишу вышеупомянутое, данные, похоже, правильно перезаписывают конкретный раздел, но каждый раз, когда запись выполняется, Я получаю следующую ошибку.

Обратите внимание, я также установил следующую конфигурацию искры перед записью:

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Но я все еще получаю следующую ошибку:

AnalysisException: "Data written out does not match replaceWhere 'FILE_DATE=2020-01-19'.\nInvalid data would be written to partitions FILE_DATE=2020-01-20.;"

Не могли бы вы помочь, пожалуйста.


person SWDeveloper    schedule 22.01.2020    source источник
comment
такая же проблема здесь   -  person Mehdi TAZI    schedule 04.06.2020
comment
Сэмюэл Лью - Не знаю, почему мой комментарий был удален ... пожалуйста, дайте мне знать   -  person Arun S    schedule 26.06.2020
comment
вы нашли решение для этого? пожалуйста, дайте мне знать   -  person Arun S    schedule 26.06.2020
comment
spark.conf.set (spark.sql.sources.partitionOverwriteMode, dynamic) работает только для паркетной таблицы.   -  person Ali Hasan    schedule 08.12.2020


Ответы (2)


При использовании replaceWhere для перезаписи дельта-раздела необходимо помнить о нескольких вещах. Ваш фрейм данных должен быть отфильтрован перед записью в разделы, например, у нас есть фрейм данных DF:

введите описание изображения здесь

Когда мы записываем этот фрейм данных в дельта-таблицу, тогда диапазон coulmn раздела фрейма данных должен быть отфильтрован, что означает, что у нас должны быть только значения столбцов раздела в пределах нашего диапазона условий replaceWhere.

 DF.write.format("delta").mode("overwrite").option("replaceWhere",  "date >= '2020-12-14' AND date <= '2020-12-15' ").save( "Your location")

если мы используем дату условия ‹'2020-12-15' вместо date‹ = '2020-12-15', это даст нам ошибку:

введите описание изображения здесь

Другое дело, что значение столбца раздела необходимо в цитате «2020-12-15», иначе есть вероятность, что это приведет к ошибке.

Также открыт запрос на включение для дельта-перезаписи разделаspark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic") здесь https://github.com/delta-io/delta/pull/371 не уверен, планируют ли они это внедрить.

person Ali Hasan    schedule 15.12.2020
comment
Это следует отметить как правильный ответ. Спасибо али - person saranya elumalai; 25.03.2021

Я столкнулся с тем же и понял, что в моем фрейме данных было больше значений для столбца, которым я пытался заменить раздел. Вы должны сначала отфильтровать фрейм данных и подготовить replaceWhere и написать.

person Arvind    schedule 10.08.2020