pyspark delta lake optimize - не удается проанализировать SQL

У меня есть дельта-таблица, созданная с использованием spark 3.x и delta 0.7.x:

data = spark.range(0, 5)
data.write.format("delta").mode("overwrite").save("tmp/delta-table")
# add some more files
data = spark.range(20, 100)
data.write.format("delta").mode("append").save("tmp/delta-table")

df = spark.read.format("delta").load("tmp/delta-table")
df.show()

Теперь в журнале создается довольно много файлов (во многих случаях слишком маленькие паркетные файлы).

%ls tmp/delta-table

Я хочу их сжать:

df.createGlobalTempView("my_delta_table")
spark.sql("OPTIMIZE my_delta_table ZORDER BY (id)")

не работает с:

ParseException: 
mismatched input 'OPTIMIZE' expecting {'(', 'ADD', 'ALTER', 'ANALYZE', 'CACHE', 'CLEAR', 'COMMENT', 'COMMIT', 'CREATE', 'DELETE', 'DESC', 'DESCRIBE', 'DFS', 'DROP', 'EXPLAIN', 'EXPORT', 'FROM', 'GRANT', 'IMPORT', 'INSERT', 'LIST', 'LOAD', 'LOCK', 'MAP', 'MERGE', 'MSCK', 'REDUCE', 'REFRESH', 'REPLACE', 'RESET', 'REVOKE', 'ROLLBACK', 'SELECT', 'SET', 'SHOW', 'START', 'TABLE', 'TRUNCATE', 'UNCACHE', 'UNLOCK', 'UPDATE', 'USE', 'VALUES', 'WITH'}(line 1, pos 0)

== SQL ==
OPTIMIZE my_delta_table ZORDER BY (id)
^^^

Вопрос:

  1. Как я могу заставить это работать (оптимизировать) без сбоя запроса
  2. есть ли более собственный API, чем обращение к текстовому SQL?

Уведомление:

spark is started like this:

import pyspark
from pyspark.sql import SparkSession

spark = pyspark.sql.SparkSession.builder.appName("MyApp") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:0.7.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

from delta.tables import *

person Georg Heiler    schedule 28.08.2020    source источник


Ответы (2)


OPTIMIZE недоступен в OSS Delta Lake. Если вы хотите сжать файлы, следуйте инструкциям в Компактные файлы раздел. Если вы хотите использовать ZORDER, в настоящее время вам необходимо использовать Databricks Runtime.

person zsxwing    schedule 29.08.2020
comment
Но: списки docs.delta.io/latest/delta-utility.html OPTIMIZE как поддерживается? И даже: spark.sql("OPTIMIZE my_delta_table") терпит неудачу. Однако github.com/delta-io/delta/issues/368 выглядит как будто ваш ответ все еще верен на сегодняшний день. - person Georg Heiler; 30.08.2020
comment
Действительно, подтверждено. Не реализовано. Дополнительные материалы для чтения: github.com/databricks/tech-talks/tree/master/ и youtube.com/watch?v=u1VfOiHVeMI - person Georg Heiler; 30.08.2020
comment
df.repartition(10, col("foo"), col("bar)).sortWithinPartitions, потенциально также repartitionByRange может быть хорошим приближением в OSS. - person Georg Heiler; 30.08.2020

Если вы используете Delta локально, это означает, что вы должны использовать OSS Delta Lake. Команда optimize доступна только для Databricks Delta Lake. Для сжатия файлов в OSS вы можете сделать следующее: https://docs.delta.io/latest/best-practices.html#compact-files

person Cristián Vargas Acevedo    schedule 09.09.2020
comment
Я пытаюсь запустить уплотнение в дельта-таблице (потоковый приемник), где искровое задание выполняет MERGE для этой таблицы каждые 5 минут. Во время выполнения уплотнения в таблице, если в той же таблице происходят обновления, как это работает? Должны ли мы дождаться успешного выполнения уплотнения, прежде чем возобновлять обновления, или Spark сможет им управлять? Кроме того, существуют ли какие-либо рекомендации по учету числа повторных разделов? Я имею в виду, что у меня 1000 маленьких файлов, и что лучше переразбить на 10 или даже на 5? - person Rak; 20.04.2021