pyspark - получение последней секции из логики секционированных столбцов Hive

Я новичок в pySpark. Я пытаюсь получить последний раздел (раздел даты) таблицы улья, используя PySpark-dataframes, и делаю это, как показано ниже. Но я уверен, что есть лучший способ сделать это, используя функции фрейма данных (а не написав SQL). Не могли бы вы поделиться мнениями о лучших способах.

Это решение сканирует все данные в таблице Hive, чтобы получить их.

df_1 = sqlContext.table("dbname.tablename");

df_1_dates = df_1.select('partitioned_date_column').distinct().orderBy(df_1['partitioned_date_column'].desc())

lat_date_dict=df_1_dates.first().asDict()

lat_dt=lat_date_dict['partitioned_date_column']

person vinu.m.19    schedule 07.03.2019    source источник
comment
Используйте show partitions dbname.tablename и выберите последнюю строку возвращаемого фрейма данных, чтобы получить последний раздел.   -  person philantrovert    schedule 08.03.2019
comment
К вашему сведению, на трекере Spark есть проблема по этому поводу. SPARK-12890: запрос Spark SQL, относящийся только к полям раздела, не должен сканировать все данные.   -  person Nick Chammas    schedule 23.12.2020


Ответы (2)


Я согласен с @philantrovert, что упомянул в комментарии. Вы можете использовать нижеприведенный подход для сокращения разделов, чтобы отфильтровать и ограничить количество разделов, сканируемых для вашей таблицы улья.

>>> spark.sql("""show partitions test_dev_db.newpartitiontable""").show();
+--------------------+
|           partition|
+--------------------+
|tran_date=2009-01-01|
|tran_date=2009-02-01|
|tran_date=2009-03-01|
|tran_date=2009-04-01|
|tran_date=2009-05-01|
|tran_date=2009-06-01|
|tran_date=2009-07-01|
|tran_date=2009-08-01|
|tran_date=2009-09-01|
|tran_date=2009-10-01|
|tran_date=2009-11-01|
|tran_date=2009-12-01|
+--------------------+

>>> max_date=spark.sql("""show partitions test_dev_db.newpartitiontable""").rdd.flatMap(lambda x:x).map(lambda x : x.replace("tran_date=","")).max()
>>> print max_date
2009-12-01
>>> query = "select city,state,country from test_dev_db.newpartitiontable where tran_date ='{}'".format(max_date)

>>> spark.sql(query).show();
+--------------------+----------------+--------------+
|                city|           state|       country|
+--------------------+----------------+--------------+
|         Southampton|         England|United Kingdom|
|W Lebanon        ...|              NH| United States|
|               Comox|British Columbia|        Canada|
|           Gasperich|      Luxembourg|    Luxembourg|
+--------------------+----------------+--------------+

>>> spark.sql(query).explain(True)
== Parsed Logical Plan ==
'Project ['city, 'state, 'country]
+- 'Filter ('tran_date = 2009-12-01)
   +- 'UnresolvedRelation `test_dev_db`.`newpartitiontable`

== Analyzed Logical Plan ==
city: string, state: string, country: string
Project [city#9, state#10, country#11]
+- Filter (tran_date#12 = 2009-12-01)
   +- SubqueryAlias newpartitiontable
      +- Relation[city#9,state#10,country#11,tran_date#12] orc

== Optimized Logical Plan ==
Project [city#9, state#10, country#11]
+- Filter (isnotnull(tran_date#12) && (tran_date#12 = 2009-12-01))
   +- Relation[city#9,state#10,country#11,tran_date#12] orc

== Physical Plan ==
*(1) Project [city#9, state#10, country#11]
+- *(1) FileScan orc test_dev_db.newpartitiontable[city#9,state#10,country#11,tran_date#12] Batched: true, Format: ORC, Location: PrunedInMemoryFileIndex[hdfs://xxx.host.com:8020/user/xxx/dev/hadoop/database/test_dev..., PartitionCount: 1, PartitionFilters: [isnotnull(tran_date#12), (tran_date#12 = 2009-12-01)], PushedFilters: [], ReadSchema: struct<city:string,state:string,country:string>

Вы можете видеть в приведенном выше плане, что PartitionCount: 1 просканировал только один раздел из 12 доступных разделов.

person vikrant rana    schedule 10.08.2019

Основываясь на ответе Викранта, вот более общий способ извлечения значений столбцов раздела непосредственно из метаданных таблицы, который позволяет избежать сканирования Spark всех файлов в таблице.

Во-первых, если ваши данные еще не зарегистрированы в каталоге, вы захотите сделать это, чтобы Spark мог видеть детали раздела. Здесь я регистрирую новую таблицу с именем data.

spark.catalog.createTable(
    'data',
    path='/path/to/the/data',
    source='parquet',
)
spark.catalog.recoverPartitions('data')
partitions = spark.sql('show partitions data')

Однако, чтобы показать автономный ответ, я вручную создам partitions DataFrame, чтобы вы могли видеть, как он будет выглядеть, а также решение для извлечения из него определенного значения столбца.

from pyspark.sql.functions import (
    col,
    regexp_extract,
)

partitions = (
    spark.createDataFrame(
        [
            ('/country=usa/region=ri/',),
            ('/country=usa/region=ma/',),
            ('/country=russia/region=siberia/',),
        ],
        schema=['partition'],
    )
)
partition_name = 'country'

(
    partitions
    .select(
        'partition',
        regexp_extract(
            col('partition'),
            pattern=r'(\/|^){}=(\S+?)(\/|$)'.format(partition_name),
            idx=2,
        ).alias(partition_name),
    )
    .show(truncate=False)
)

Результат этого запроса:

+-------------------------------+-------+
|partition                      |country|
+-------------------------------+-------+
|/country=usa/region=ri/        |usa    |
|/country=usa/region=ma/        |usa    |
|/country=russia/region=siberia/|russia |
+-------------------------------+-------+

Решение в Scala будет очень похоже на это, за исключением того, что вызов regexp_extract() будет выглядеть немного иначе:

    .select(
        regexp_extract(
            col("partition"),
            exp=s"(\\/|^)${partitionName}=(\\S+?)(\\/|$$)",
            groupIdx=2
        ).alias(partitionName).as[String]
    )

Опять же, преимущество запроса значений разделов таким способом заключается в том, что Spark не будет сканировать все файлы в таблице, чтобы получить ответ. Если у вас есть таблица с десятками или сотнями тысяч файлов, вы значительно сэкономите время.

person Nick Chammas    schedule 14.01.2021