pyspark: функция десериализации avro на фрейме данных не работает, поскольку ожидает список

Функция десериализации avro ожидает байтов в списке и не работает при применении к кадру данных. Работает только с collect (), но драйверу / мастеру не хватает памяти

При использовании Spark версии 2.3.3 с Python 3.6.8 фрейм данных создается из выбора таблицы Hive, в которой есть сериализованное сообщение avro. Затем я использую https://github.com/Landoop/python-serializers.git поскольку он поддерживает десериализацию avro с использованием реестра конфлюентных схем на python 3.x

Пытался применить функцию десериализации к кадру данных, и это не удалось. Работает только в том случае, если я df.collect () и использую цикл for для десериализации каждой записи или работает, если я конвертирую df в rdd.map и десериализую каждую строку. Оба этих случая работают только в тестовом режиме, в продукте он либо OOM, либо работает вечно на данных куста 10 ГБ, работающих на серверах 5node 30GB, 8cpu.

spark = SparkSession \
    .builder \
....
    .enableHiveSupport() \
    .getOrCreate()

df = spark.sql("SELECT * FROM table1")
unbase_df = df.select(unbase64(df.mycolumn1))

client = SchemaRegistryClient(url='1.2.3.4:1234')
serializer = MessageSerializer(client)

##attempt 1##FAILS##
decode_df = df.select(serializer.decode_message('mycolumn1'))
###->ERROR -> 
##attempt 2##FAILS##
decode_df_2 = df.select(serializer.decode_message(b'mycolumn1'))

##attempt 3##WORKS BUT OOM with high volume on master(drivermanager)##
unbase_collect = unbase_df.collect()
decode_list = [serializer.decode_message(msg.mycolumn1) for msg in unbase_collect]

##attempt 4##WORKS BUT RUNS FOR EVER##
def avrodecoder(row):
    decoded_row = serializer.decode_message(row['mycolumn1'])
    return decoded_row

decode_rdd = unbase_df.select("*").rdd.map(avrodecoder)

## After #3 or #4 works I convert back to dataframe with schema
schema = StructType([
    StructField("1column",  StructType([
.......
    StructField("ncolumn", StringType()])

decode_df = spark.createDataFrame(decode_rdd,schema)

Сообщение об ошибке в случае # попытки 1

in decode_message(self, message)
    185             raise SerializerError("message is too small to decode")
    186
--> 187         with ContextBytesIO(message) as payload:
    188             magic, schema_id = struct.unpack('>bI', payload.read(5))
    189             if magic != MAGIC_BYTE:

TypeError: a bytes-like object is required, not 'str'```

Error message in case of #attempt 2
```.....python3.6/site-packages/datamountaineer/schemaregistry/serializers/MessageSerializer.py
in decode_message(self, message)
    188             magic, schema_id = struct.unpack('>bI', payload.read(5))
    189             if magic != MAGIC_BYTE:
--> 190                 raise SerializerError("message does not start with magic byte")
    191             decoder_func = self._get_decoder_func(schema_id, payload)
    192             return decoder_func(payload)

SerializerError: the message does not start with a magic byte ```
  1. Как я могу десериализовать avro через реестр конфлюентных схем непосредственно на фреймворке данных
  2. Как я могу убедиться, что все преобразования выполняются только для рабочих / исполнителей
  3. Как я могу заставить его работать достаточно эффективно, чтобы он не OOM и не работал 5,6+ часов для ‹10 ГБ данных
  4. Не понимаю логики того, почему график «Yarn Pending Memory» показывает, что в обоих рабочих случаях он поднимается до 7 + ТБ или даже выше.

person Enjay Jack    schedule 14.06.2019    source источник


Ответы (1)


Прежде чем вы сможете применить простую функцию Python к Column, вы должны преобразовать ее в функцию, определяемую пользователем (UDF):

from pyspark.sql.functions import udf

@udf(decoded_row_schema)
def avrodecoder(row):
    decoded_row = serializer.decode_message(row['mycolumn1'])
    return decoded_row

где decoded_row_schema описывает форму возвращаемого объекта.

Однако, если вы используете текущую версию (> = 2.4.0), это может вообще не понадобиться - Pyspark 2.4.0, прочтите avro из kafka с потоком чтения - Python

person user11646763    schedule 14.06.2019
comment
Я попробовал, во-вторых, предложить spark_avro на Spark 2.4.x. Определена функция from_avro, аналогичная вашему примеру. Но выводит ошибку unbase64_df.show (1) | [00 00 00 01 76 4 ... | `` avro_type_struct = {....} avro_df = unbase64_raw_df.select (from_avro ('unbase_raw_event', avro_type_struct)) `` `` ОШИБКА py4j.protocol.Py4JJavaError: .. во время вызова o77.showString. : org.apache.spark.SparkException: задание прервано из-за сбоя этапа: задача 17 на этапе 24.0: потерянная задача 17.3 на этапе 24.0 (TID 347, главный, исполнитель 2): org.apache.avro.AvroRuntimeException: искаженные данные. Длина отрицательная: -1 - person Enjay Jack; 15.06.2019