Pyspark записывает кадр данных в avro, сохраняя последовательность ключевых значений

Я пытаюсь прочитать файл avro с помощью pyspark и отсортировать один из столбцов на основе определенных ключей. Один из столбцов в моем файле avro содержит данные MapType, которые мне нужно отсортировать по ключам. Тестовый avro содержит только одну строку со столбцом сущностей, имеющим MapType данных. Я намерен записать вывод в файл avro, но с порядком ключей. К сожалению, я не могу этого добиться, не уверен, что это вообще возможно в avro? Он записывает обратно тем же образом, что и входные данные. Вот мой код (я создал блокнот, чтобы проверить его):

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, lit, to_json, create_map, from_json
from pyspark.sql import Row
from pyspark import StorageLevel
import json
from pyspark.sql.types import StringType
import shutil
from pyspark.sql.types import MapType, ArrayType, StringType, StructType, StructField

spark = SparkSession     .builder     .appName("AvroTest")     .config("spark.jars.packages", "org.apache.spark:spark-avro_2.11:2.4.0")     .getOrCreate()

df = spark.read.format("avro").load("part-r-00000.avro")
schema = df.select('entities').schema
sch = schema.fields[0].dataType
print(df.schema)

@udf
def udf_func(line):
    for entkey,subdict in line.items():
        subdictnew = subdict.asDict(True)
        sorteddict = dict(sorted(subdictnew['entities'].items(), key=lambda a: int(a[0])))
        subdictnew['entities'] = sorteddict
        line[entkey] = subdictnew
    return str(line)

dfnew = df.withColumn('entities', from_json(udf_func(df['entities']), sch)).persist(StorageLevel.MEMORY_ONLY_SER)
#dfnew.show()
d = dfnew.dtypes
newschema = dfnew.schema

try:
    shutil.rmtree('testavro/sortedData')
except:
    print('folder already removed')
dfnew.write.format('avro').save('ctipavro/sortedData')
dfnew.show(1, False)

Приведенный выше код записывает avro обратно, но в несортированном виде. Последняя строка выводит запись столбца фрейма данных для «сущностей» в отсортированном виде.

|37321431529|37321431529|1561020714|[trade -> [trade, [59489777 -> [TRADE_ASSOC_TO_DB_DT -> 2011-09-30, FCBA_IN -> N, ACCT_BALANCE_AM -> 0, CII_BKRPT_CD ->   , CREDIT_AM_EXCP_CD -> 6, FRAUD_IN -> N, ACCT_REPORTED_DT -> 2019-04-01, DATA_USAGE_EXCL_IN -> N, CII_REAFF_CD ->   , DEDUP_RANK_CD -> 0, NY_DISPLAY_RULE_IN -> N, ACCT_HIGH_BALANCE_AM_EXCP_CD -> 6, ACCT_PAYMENT_AM -> 13, EXCLUSION_CD -> 0, KOB_CD -> BB, PAYMENT_GRID_2 -> 0000000-0-0000-00-00000..............

Обратите внимание, здесь я печатаю вывод фрейма данных, который уже был отсортирован. Но когда я пытаюсь прочитать сохраненный файл avro обратно в новый фрейм данных и выполнить show(), ключи снова не отсортированы. Обратите внимание на первый ключ для trade -> [trade, он должен был быть 59489777, тогда как это что-то другое - 51237292611. Кстати, этот ключ появлялся, когда я впервые читал ввод avro, не знаю, почему после сортировки и обратной записи он сначала печатает тот же ключ:

dffresh = spark.read.format("avro").load("testavro/sortedData")
schema = dffresh.schema
print(schema)
dffresh.show(1, False)

Выход:

|37321431529|37321431529|1561020714|[trade -> [trade, [51237292611 -> [TRADE_ASSOC_TO_DB_DT -> 2014-09-20, FCBA_IN -> N, ACCT_BALANCE_AM -> 0, CII_BKRPT_CD ->   , CREDIT_AM_EXCP_CD -> 6, FRAUD_IN -> N, ACCT_REPORTED_DT -> 2019-05-01, DATA_USAGE_EXCL_IN -> N, CII_REAFF_CD ->   , DEDUP_RANK_CD -> 0, NY_DISPLAY_RULE_IN -> N, ACCT_HIGH_BALANCE_AM_EXCP_CD -> 6, ACCT_PAYMENT_AM -> 0, EXCLUSION_CD -> 0, KOB_CD -> BC, PAYMENT_GRID_2 -> 000000C0000000..................................

Я бы попросил кого-нибудь, пожалуйста, помогите мне. Я пробовал множество способов и искал несколько вопросов SO и не мог найти подсказку о том, как этого добиться.


person ArinCool    schedule 11.06.2020    source источник
comment
Кто-нибудь может помочь?   -  person ArinCool    schedule 11.06.2020
comment
Можете ли вы прикрепить образцы данных и схему avro, связанные с этим? особенно этот файл part-r-00000.avro и схема для него   -  person Som    schedule 15.06.2020


Ответы (1)


Если ваши исходные данные находятся в формате avro, рекомендуется записывать обработанный вывод в формате файла Parquet. Вы получаете преимущество предиката pushdown и всегда можете обрабатывать выбранное количество столбцов.

Но если запись в формате avro снова является частью вашего процесса, порядок столбцов не всегда гарантируется, поскольку используется структура данных Map. Вы можете смягчить это, используя функцию select и считывая столбцы в выбранном вами порядке.

person Yayati Sule    schedule 16.06.2020
comment
Спасибо @Yayati. Моя проблема в том, что один из столбцов содержит данные json, и в нем есть несколько атрибутов, которые мне нужно отсортировать. Мой вопрос: если я использую функцию select, будут ли атрибуты в данных столбца отображаться отсортированными? - person ArinCool; 17.06.2020
comment
Да, они будут отображаться отсортированными - person Yayati Sule; 17.06.2020
comment
Нет, это не так. Я попробовал. Проблема в том, что когда вновь созданный avro сохраняется, а затем читается, данные не отображаются отсортированными. Этот код сохраняет новый avro и печатает данные из фрейма данных, они отображаются отсортированными - dfnew.write.format('avro').save('ctipavro/sortedData') dfnew.show(1, False). При этом не появляется отсортировано: dffresh = spark.read.format("avro").load("ctipavro/sortedData") dffresh.select('entities').show(1, False). Два вывода выглядят так же, как я указал в своем вопросе. Может быть, во время операции записи данные снова становятся несортированными? - person ArinCool; 17.06.2020
comment
Нет, я имел в виду, что вы должны явно передать имя столбца в схеме. поскольку объекты представляют собой карту/структуру, вам необходимо явно взорвать значения столбца и выбрать их. Например, explode(col(entities)) as e, а затем df.select($"e.column_name") - person Yayati Sule; 17.06.2020
comment
Он все еще не работает, не могли бы вы предоставить рабочий фрагмент кода, чтобы просто прочитать столбец вложенного типа карты в отсортированном виде. - person ArinCool; 17.06.2020
comment
Я постараюсь когда-нибудь достать тебе - person Yayati Sule; 17.06.2020
comment
Привет! Вы можете следовать этому примеру, чтобы получить представление о функция взрыва - person Yayati Sule; 17.06.2020