Плоская карта PySpark должна возвращать кортежи с типизированными значениями

Я использую блокнот Jupyter с PySpark. Внутри этого у меня есть фреймворк данных со схемой с именами и типами столбцов (целое число,...) для этих столбцов. Теперь я использую такие методы, как flatMap, но это возвращает список кортежей, у которых больше нет фиксированного типа. Есть ли способ добиться этого?

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- ...
 |-- ...
 |-- ratings: integer (nullable = true)

Затем я использую flatMap для выполнения некоторых расчетов со значениями рейтинга (здесь они запутаны):

df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings))
y_rate.toDF().printSchema()

И теперь я получаю сообщение об ошибке:

TypeError: Не удается вывести схему для типа:

Есть ли способ использовать map/flatMap/reduce, сохранив схему? или, по крайней мере, возвращать кортежи со значениями определенного типа?


person Matthias    schedule 14.05.2016    source источник


Ответы (1)


Прежде всего, вы используете неправильную функцию. flatMap будет map и flatten, поэтому предположим, что ваши данные выглядят так:

df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"])

вывод flatMap будет эквивалентен:

sc.parallelize(['foo', 0, 'bar', 5])

Отсюда и ошибка, которую вы видите. Если вы действительно хотите, чтобы это работало, вы должны использовать map:

df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF()
## DataFrame[_1: string, _2: bigint]

Далее, сопоставление DataFrame больше не поддерживается в версии 2.0. Сначала вы должны извлечь rdd (см. df.rdd.map выше).

Наконец, передача данных между Python и JVM крайне неэффективна. Это не только требует передачи данных между Python и JVM с соответствующей сериализацией/десериализацией и выводом схемы (если схема не указана явно), что также устраняет лень. Для таких вещей лучше использовать выражения SQL:

from pyspark.sql.functions import when

df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings))

Если по какой-то причине вам нужен простой код Python, UDF может быть лучшим выбором.

person zero323    schedule 14.05.2016
comment
Чрезвычайно полезно. спасибо за ваш пример кода. Я просто не понял часть с flatMap vs Map. - person Matthias; 14.05.2016
comment
flatMap является функцией RDD[T] => (T => Iterable[U]) => RDD[U]. Другими словами, он ожидает, что функция вернет Itereble (кортеж Python), и объединяет их (выравнивает) результат. - person zero323; 14.05.2016
comment
Есть ли способ указать имя столбца «когда/иначе» в этом утверждении? см. df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings)) @zero323 - person Matthias; 18.05.2016
comment
Да, вы можете использовать alias, например: when(df.ratings > 5, 5).otherwise(df.ratings).alias("foo"). - person zero323; 18.05.2016