Применение функции окна для вычисления различий в pySpark

Я использую pySpark и настроил свой фрейм данных с двумя столбцами, представляющими дневную цену актива следующим образом:

ind = sc.parallelize(range(1,5))
prices = sc.parallelize([33.3,31.1,51.2,21.3])
data = ind.zip(prices)
df = sqlCtx.createDataFrame(data,["day","price"])

Я получаю при подаче заявки df.show():

+---+-----+
|day|price|
+---+-----+
|  1| 33.3|
|  2| 31.1|
|  3| 51.2|
|  4| 21.3|
+---+-----+

Что нормально и все. Я хотел бы иметь еще один столбец, содержащий ежедневные доходы столбца цен, то есть что-то вроде

(price(day2)-price(day1))/(price(day1))

После долгих исследований мне сказали, что это наиболее эффективно достигается за счет применения pyspark.sql.window функций, но я не могу понять, как это сделать.


person Thomas Moore    schedule 19.04.2016    source источник
comment
Я предполагаю, что sqlCtx эквивалентен объекту 'spark', который получается с помощью sc = SparkContext ('local') spark = SparkSession (sc)   -  person Nir    schedule 26.09.2018


Ответы (2)


Вы можете перенести столбец предыдущего дня, используя lag и добавьте дополнительный столбец, который фактически обеспечивает ежедневную отдачу от двух столбцов, но вам, возможно, придется указать Spark, как разделить ваши данные и / или приказать ему выполнять задержку, например это:

from pyspark.sql.window import Window
import pyspark.sql.functions as func
from pyspark.sql.functions import lit

dfu = df.withColumn('user', lit('tmoore'))

df_lag = dfu.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                 .over(Window.partitionBy("user")))

result = df_lag.withColumn('daily_return', 
          (df_lag['price'] - df_lag['prev_day_price']) / df_lag['price'] )

>>> result.show()
+---+-----+-------+--------------+--------------------+
|day|price|   user|prev_day_price|        daily_return|
+---+-----+-------+--------------+--------------------+
|  1| 33.3| tmoore|          null|                null|
|  2| 31.1| tmoore|          33.3|-0.07073954983922816|
|  3| 51.2| tmoore|          31.1|         0.392578125|
|  4| 21.3| tmoore|          51.2|  -1.403755868544601|
+---+-----+-------+--------------+--------------------+

Вот более подробное введение в Оконные функции в Spark.

person Oleksiy    schedule 20.04.2016
comment
Привет. Спасибо! Это очень полезно. Кстати, а что делает засветившаяся функция? - person Thomas Moore; 21.04.2016
comment
lit - создает столбец с буквальным значением - spark.apache.org/docs/latest/api/python/ - person Oleksiy; 21.04.2016
comment
второстепенное примечание. также рекомендуется отсортировать столбец, к которому применяется задержка, например Window.partitionBy (пользователь) .orderBy (день, по возрастанию = True) - person Quetzalcoatl; 28.08.2018
comment
При оценке df_lag я получаю сообщение об ошибке: задержка оконной функции (цена # 66, 1, null) требует упорядочивания окна, dfu.withColumn ('prev_day_price', func.lag (dfu ['price']). Over (Window. orderBy (user))) решает эту проблему - person Nir; 26.09.2018
comment
Как этого можно достичь с помощью искровой структурированной потоковой передачи? - person Nagesh; 05.05.2019
comment
Может ли кто-нибудь предложить, как это можно реализовать с помощью Spark Streaming? - person Nagesh; 06.05.2019

Функция Lag может помочь вам решить ваш вариант использования.

from pyspark.sql.window import Window
import pyspark.sql.functions as func

### Defining the window 
Windowspec=Window.orderBy("day")

### Calculating lag of price at each day level
prev_day_price= df.withColumn('prev_day_price',
                        func.lag(dfu['price'])
                                .over(Windowspec))

### Calculating the average                                  
result = prev_day_price.withColumn('daily_return', 
          (prev_day_price['price'] - prev_day_price['prev_day_price']) / 
prev_day_price['price'] )
person Sushmita Konar    schedule 28.03.2019