Обсуждение того, как преобразовать типы данных столбцов в PySpark DataFrames

Введение

Довольно распространенная операция в PySpark - это приведение типа, которое обычно требуется, когда нам нужно изменить тип данных определенных столбцов в DataFrames. Например, довольно часто (и это плохая практика!) Хранить дату в виде строк или даже целых и двойных чисел как StringType.

В сегодняшнем кратком руководстве мы рассмотрим, как изменить типы столбцов определенных столбцов DataFrame в PySpark. В частности, мы обсудим, как это сделать, используя

  • функция cast()
  • функция selectExpr()
  • Spark SQL

Во-первых, давайте создадим пример DataFrame, на который мы будем ссылаться в этой статье, чтобы продемонстрировать несколько концепций.

from pyspark.sql import SparkSession
# Create an instance of spark session
spark_session = SparkSession.builder \
    .master('local[1]') \
    .appName('Example') \
    .getOrCreate()
df = spark_session.createDataFrame(
    [
        (1, '10-01-2020', '1.0', '100'),
        (2, '14-02-2021', '2.0', '200'),
        (3, '15-06-2019', '3.0', '300'),
        (4, '12-12-2020', '4.0', '400'),
        (5, '01-09-2019', '5.0', '500'),
    ],
    ['colA', 'colB', 'colC', 'colD']
)

df.show()
+----+----------+----+----+
|colA|      colB|colC|colD|
+----+----------+----+----+
|   1|10-01-2020| 1.0| 100|
|   2|14-02-2021| 2.0| 200|
|   3|15-06-2019| 3.0| 300|
|   4|12-12-2020| 4.0| 400|
|   5|01-09-2019| 5.0| 500|
+----+----------+----+----+
df.printSchema()
root
 |-- colA: long (nullable = true)
 |-- colB: string (nullable = true)
 |-- colC: string (nullable = true)
 |-- colD: string (nullable = true)

В следующих разделах мы продемонстрируем, как изменить тип столбца colB, colC и colD на DateType, DoubleType и IntegerType соответственно.

Использование функции cast ()

Первый вариант, который у вас есть, когда дело доходит до преобразования типов данных, - это функция pyspark.sql.Column.cast(), которая преобразует входной столбец в указанный тип данных.

from datetime import datetime
from pyspark.sql.functions import col, udf
from pyspark.sql.types import DoubleType, IntegerType, DateType

# UDF to process the date column
func = udf(lambda x: datetime.strptime(x, '%d-%m-%Y'), DateType())
df = df \
    .withColumn('colB', func(col('colB'))) \
    .withColumn('colC', col('colC').cast(DoubleType())) \
    .withColumn('colD', col('colD').cast(IntegerType()))

df.show()
+----+----------+----+----+
|colA|      colB|colC|colD|
+----+----------+----+----+
|   1|2020-01-10| 1.0| 100|
|   2|2021-02-14| 2.0| 200|
|   3|2019-06-15| 3.0| 300|
|   4|2020-12-12| 4.0| 400|
|   5|2019-09-01| 5.0| 500|
+----+----------+----+----+
df.printSchema()
root
 |-- colA: long (nullable = true)
 |-- colB: date (nullable = true)
 |-- colC: double (nullable = true)
 |-- colD: integer (nullable = true)

Обратите внимание, что для преобразования строки в DateType нам необходимо указать UDF для обработки точного формата даты в строке.

Использование функции selectExpr ()

В качестве альтернативы вы можете использовать функцию pyspark.sql.DataFrame.selectExpr, указав соответствующие выражения SQL, которые могут приводить тип данных к нужным столбцам, как показано ниже.

df = df.selectExpr(
    'colA',
    'to_date(colB, \'dd-MM-yyyy\') colB',
    'cast(colC as double) colC',
    'cast(colD as int) colD',
)
df.show()
+----+----------+----+----+
|colA|      colB|colC|colD|
+----+----------+----+----+
|   1|2020-01-10| 1.0| 100|
|   2|2021-02-14| 2.0| 200|
|   3|2019-06-15| 3.0| 300|
|   4|2020-12-12| 4.0| 400|
|   5|2019-09-01| 5.0| 500|
+----+----------+----+----+
df.printSchema()
root
 |-- colA: long (nullable = true)
 |-- colB: date (nullable = true)
 |-- colC: double (nullable = true)
 |-- colD: integer (nullable = true)

Использование Spark SQL

Наконец, вы даже можете использовать Spark SQL для преобразования нужных столбцов аналогично тому, как мы использовали для этого функцию selectExpr.

# First we need to register the DF as a global temporary view
df.createGlobalTempView("df")
df = spark_session.sql(
    """
    SELECT 
        colA,
        to_date(colB, 'dd-MM-yyyy') colB,
        cast(colC as double) colC,
        cast(colD as int) colD
    FROM global_temp.df
    """
)
df.show()
+----+----------+----+----+
|colA|      colB|colC|colD|
+----+----------+----+----+
|   1|2020-01-10| 1.0| 100|
|   2|2021-02-14| 2.0| 200|
|   3|2019-06-15| 3.0| 300|
|   4|2020-12-12| 4.0| 400|
|   5|2019-09-01| 5.0| 500|
+----+----------+----+----+
df.printSchema()
root
 |-- colA: long (nullable = true)
 |-- colB: date (nullable = true)
 |-- colC: double (nullable = true)
 |-- colD: integer (nullable = true)

Последние мысли

В сегодняшнем кратком руководстве мы обсудили несколько различных способов изменения типов столбцов столбцов DataFrame в PySpark. В частности, мы изучили, как можно использовать withColumn() функцию в сочетании с cast(), а также использовать подходы, похожие на SQL, такие как selectExpr() или Spark SQL.

Станьте участником и читайте все новости на Medium. Ваш членский взнос напрямую поддерживает меня и других писателей, которых вы читаете. Вы также получите полный доступ ко всем историям на Medium.

Вам также может понравиться