Заполните нулевые значения столбца фрейма данных Pyspark средним значением из того же столбца

С таким фреймом данных,

rdd_2 = sc.parallelize([(0,10,223,"201601"), (0,10,83,"2016032"),(1,20,None,"201602"),(1,20,3003,"201601"), (1,20,None,"201603"), (2,40, 2321,"201601"), (2,30, 10,"201602"),(2,61, None,"201601")])

df_data = sqlContext.createDataFrame(rdd_2, ["id", "type", "cost", "date"])
df_data.show()

+---+----+----+-------+
| id|type|cost|   date|
+---+----+----+-------+
|  0|  10| 223| 201601|
|  0|  10|  83|2016032|
|  1|  20|null| 201602|
|  1|  20|3003| 201601|
|  1|  20|null| 201603|
|  2|  40|2321| 201601|
|  2|  30|  10| 201602|
|  2|  61|null| 201601|
+---+----+----+-------+

Мне нужно заполнить нулевые значения средним из существующих значений, при этом ожидаемый результат будет

+---+----+----+-------+
| id|type|cost|   date|
+---+----+----+-------+
|  0|  10| 223| 201601|
|  0|  10|  83|2016032|
|  1|  20|1128| 201602|
|  1|  20|3003| 201601|
|  1|  20|1128| 201603|
|  2|  40|2321| 201601|
|  2|  30|  10| 201602|
|  2|  61|1128| 201601|
+---+----+----+-------+

где 1128 - среднее из существующих значений. Мне нужно сделать это для нескольких столбцов.

Мой текущий подход - использовать na.fill:

fill_values = {column: df_data.agg({column:"mean"}).flatMap(list).collect()[0] for column in df_data.columns if column not in ['date','id']}
df_data = df_data.na.fill(fill_values)

+---+----+----+-------+
| id|type|cost|   date|
+---+----+----+-------+
|  0|  10| 223| 201601|
|  0|  10|  83|2016032|
|  1|  20|1128| 201602|
|  1|  20|3003| 201601|
|  1|  20|1128| 201603|
|  2|  40|2321| 201601|
|  2|  30|  10| 201602|
|  2|  61|1128| 201601|
+---+----+----+-------+

Но это очень громоздко. Любые идеи?


person Ivan    schedule 10.06.2016    source источник


Ответы (1)


Что ж, так или иначе вам придется:

  • вычислить статистику
  • заполнить бланки

Это в значительной степени ограничивает то, что вы можете здесь улучшить, но все же:

  • заменить flatMap(list).collect()[0] на first()[0] или распаковать структуру
  • вычислить всю статистику одним действием
  • использовать встроенные Row методы для извлечения словаря

Окончательный результат может быть таким:

def fill_with_mean(df, exclude=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c not in exclude
    ))
    return df.na.fill(stats.first().asDict())

fill_with_mean(df_data, ["id", "date"])

В Spark 2.2 или новее вы также можете использовать Imputer. См. Раздел Замена отсутствующих значений средним - Spark Dataframe.

person zero323    schedule 10.06.2016
comment
@Kevad импортирует pyspark.sql.functions как fn, затем используйте fn.avg - person marilena.oita; 28.04.2017
comment
Кто-нибудь улучшал эту функцию? Занимаю слишком много вычислительного времени на моей стороне !? :) - person Nico Coallier; 11.07.2017
comment
@NicoCoallier С точки зрения производительности вы не получите значительного улучшения. Это довольно оптимальное решение. С точки зрения API вы можете использовать Imputer. - person zero323; 11.07.2017
comment
Быстрее исключить или включить? - person Nico Coallier; 11.07.2017
comment
@NicoCoallier Это имеет незначительное влияние, но если вы делаете это с большим количеством столбцов (например, с тысячами), накладные расходы оптимизатора будут значительными. В этом случае RDD может быть быстрее. Конечно, многое зависит от того, что вы подразумеваете под медленным ... - person zero323; 11.07.2017
comment
Мой мастер продолжает сбой в этой строке, однако, когда я уменьшаю количество данных, он работает - person Nico Coallier; 11.07.2017
comment
Медленно ~ более часа - person Nico Coallier; 11.07.2017
comment
@NicoCoallier Ну, тогда попробуй определить узкое место. Если кластер в основном простаивает, скорее всего, это планировщик. В таком случае вы можете попробовать RDDs, NumPy и StatCounter. Если задачи выполняются медленно, то это либо проблема конфигурации, либо нехватка ресурсов. - person zero323; 11.07.2017
comment
Когда я увеличиваю рабочий или разбиение, он становится медленнее - person Nico Coallier; 11.07.2017
comment
У вас есть пример из DF в RDD, fillna, а затем обратно в DF? - person Nico Coallier; 11.07.2017
comment
Есть ли способ без udf? - person Nico Coallier; 11.07.2017
comment
@NicoCoallier Вы можете попробовать что-нибудь в этих строках stackoverflow.com/a/35585532/1560062 UDF здесь вообще не применим. - person zero323; 11.07.2017