Самая полезная функция Spark SQL и DataFrame для расширения встроенных возможностей PySpark.

Введение

PySpark, Python API для Apache Spark, — это мощная платформа для обработки и анализа больших данных. Он предлагает широкий спектр встроенных функций для обработки и преобразования данных. Однако существуют сценарии, в которых этих встроенных функций может быть недостаточно, и требуется применение пользовательской логики к данным. Вот где в игру вступают пользовательские функции PySpark (UDF). В этой статье мы изучим мир пользовательских функций PySpark и узнаем, как использовать их гибкость и масштабируемость для сложных задач обработки данных.

1. Зачем нам нужна пользовательская функция?

Пользовательские функции (UDF) в PySpark позволяют определять собственные пользовательские функции для выполнения операций с отдельными или несколькими столбцами DataFrame. Пользовательские функции предоставляют способ расширить встроенную функциональность PySpark, позволяя применять сложные преобразования, агрегации или вычисления, которые напрямую не поддерживаются существующими функциями.

2. Как создать пользовательскую функцию PySpark?

2.1 Создание фрейма данных

from pyspark.sql import SparkSession, Row

spark = SparkSession.builder.appName("CreateDataFrame").getOrCreate()

data = [
    {"name": "John smith", "age": 30, "city": "new york"},
    {"name": "Alice bond", "age": 35, "city": "San francisco"},
    {"name": "Bob marley", "age": 40, "city": "Chicago"}
]

df = spark.createDataFrame(data=data)
df.show()

2.2 Создание функции Python

Первым шагом в создании UDF является создание функции Python. Фрагмент кода ниже создает функцию convertCase(), которая принимает строковый параметр и преобразует первую букву каждого слова в заглавную.

def convertCase(str):
    result = ""
    if str is not None:
        arr = str.split(" ")
        for x in arr:
            result = result + x[0:1].upper() + x[1:len(x)] + " "
    return result

2.3 Преобразование функции Python в UDF

2.3.1 Использование функции udf()

from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

# Converting function to UDF 
# first parameter is a function, and second one is return type
convertCase = udf(lambda z: convertCase(z), StringType()) # StringType() is the return type

2.3.2 Использование декоратора @udf

@udf(returnType=StringType()) 
def convertCase(str):
    result = ""
    if str is not None:
        arr = str.split(" ")
        for x in arr:
            result = result + x[0:1].upper() + x[1:len(x)] + " "
    return result

3. Использование UDF с PySpark DataFrame

3.1 Использование с select()

df.select(col("age").alias("Age"), \
    convertCase(col("name")).alias("Name"), \
    convertCase(col("city")).alias("City")).show(truncate=False)

3.2 Использование с withColumn()

df.withColumn("Cureated Name", convertCase(col("name"))) \
.show(truncate=False)

4. Как использовать UDF с SQL

4.1 Зарегистрируйте UDF с помощью spark.udf.register() и используйте его в SQL

""" Using UDF on SQL """
spark.udf.register("convertCase", convertCase,StringType())
df.createOrReplaceTempView("NAME_TABLE")
spark.sql("select age, convertUDF(name) as Name from NAME_TABLE") \
     .show(truncate=False)

5. Избегайте исключений

UDF подвержены ошибкам, если они не разработаны тщательно. например, если у вас есть столбец, содержащий значение null в некоторых записях. Если вы посмотрите на функцию convertCase() в разделе 2.2 и 2.3.2, то увидите, что там проверяется, соответствует ли str значение None или нет?

Ниже следует помнить

  • Всегда лучше проверять нуль внутри функции UDF, а не проверять нуль снаружи.
  • В любом случае, если вы не можете выполнить проверку на нуль в UDF, по крайней мере, используйте IF или CASE WHEN для проверки на нуль и вызывайте UDF условно.

6. Проблема производительности при использовании UDF и альтернативных

Пользовательские функции включают сериализацию и десериализацию данных, что может повлиять на общую производительность вашего приложения Spark. Чтобы избежать этого, вы можете использовать векторизованные операции с pandas_udf(), которые одновременно обрабатывают целые столбцы, используя преимущества производительности библиотеки pandas.

from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType

# Register the pandas UDF
convertCase = pandas_udf(lambda z: convertCase(z), StringType())

# Apply the pandas UDF to a column
df.withColumn('name', convertCase(df['name'])).show()

Заключение

Хотя PySpark UDF обеспечивает гибкость, всегда рекомендуется использовать встроенные функции Spark SQL, поскольку они оптимизированы. Рассмотрите возможность создания пользовательской функции только в том случае, если она отсутствует в существующей встроенной функции SQL.