Мы можем использовать explode
функция для ее решения.
# Importing requisite functions.
from pyspark.sql.functions import array, col, explode, struct, lit
# Creating the DataFrame
df = sqlContext.createDataFrame([(10,'AA','aa','Aa'),(20,'BB','bb','Bb'),(30,'CC','cc','Cc')],['base','1','2','3'])
df.show()
+----+---+---+---+
|base| 1| 2| 3|
+----+---+---+---+
| 10| AA| aa| Aa|
| 20| BB| bb| Bb|
| 30| CC| cc| Cc|
+----+---+---+---+
Написание функции для взрыва DataFrame
.
def to_explode(df, by):
# Filter dtypes and split into column names and type description
cols, dtypes = zip(*((c, t) for (c, t) in df.dtypes if c not in by))
# Spark SQL supports only homogeneous columns
assert len(set(dtypes)) == 1, "All columns have to be of the same type"
# Create and explode an array of (column_name, column_value) structs
kvs = explode(array([
struct(lit(c).alias("key"), col(c).alias("val")) for c in cols
])).alias("kvs")
return df.select(by + [kvs]).select(by + ["kvs.key", "kvs.val"])
Применение функции ниже. Поскольку созданный столбец new_base
имеет decimal
, так как по умолчанию он имеет тип double
, поэтому мы явно преобразуем его в integer
, чтобы избежать добавления к каждому числу суффикса .0
.
df = to_explode(df, ['base'])
df = df.withColumn('new_base',col('base')+col('key'))\
.select(col('new_base').cast(IntegerType()),'val')
df.show()
+--------+---+
|new_base|val|
+--------+---+
| 11| AA|
| 12| aa|
| 13| Aa|
| 21| BB|
| 22| bb|
| 23| Bb|
| 31| CC|
| 32| cc|
| 33| Cc|
+--------+---+
df.printSchema()
root
|-- new_base: integer (nullable = true)
|-- val: string (nullable = true)
person
cph_sto
schedule
16.10.2019