Добавить столбец с внутрипредметным днем ​​взаимодействия с упорядоченным рангом в pyspark

У меня есть большой фреймворк pyspark, содержащий данные о взаимодействии с пользователем за многолетний период. Столбцов много, но три полезных для этого вопроса - userid, interaction_date и interaction_timestamp. Предположим, что для данного пользователя в таблице есть несколько записей.

Мне нужно написать функцию для добавления столбца, который будет указывать количество дней до последнего наблюдаемого взаимодействия для данного клиента в таблице. Например, для таблицы ввода

example_table_1

Я хотел бы добавить столбец, который отсчитывается от даты самого последнего взаимодействия для этого пользователя (например, дата последнего взаимодействия - 1, дата следующего предыдущего взаимодействия - 2 и т. Д.):

желаемая_output_table

Может ли кто-нибудь подвести меня к правильному пути?


person Kyle.    schedule 08.10.2019    source источник
comment
Избегайте снимков экрана в stackoverflow.   -  person cronoik    schedule 09.10.2019
comment
Ok! Сначала я попытался создать таблицу вручную, но она выглядела искаженно: /   -  person Kyle.    schedule 09.10.2019


Ответы (1)


Этого можно добиться с помощью окна < / a> функция, такая как плотный_ранк. Взгляните на комментарии ниже:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

cols = ['userid','interaction_timestamp']
data =[( '1'        ,'2018-01-02' ),
( '2'        , '2018-01-03' ),
( '1'        , '2018-01-03' ),
( '1'        , '2018-01-04' ),
( '2'        , '2018-01-02' ),
( '3'        , '2018-01-03' ),
( '4'        , '2018-01-03' )]

df = spark.createDataFrame(data, cols)

df = df.withColumn('interaction_timestamp', F.to_date('interaction_timestamp', 'yyyy-MM-dd'))

#rows with the same userid become part of the the same partition
#these partitions will be ordered descending by interaction_timestamp
w = Window.partitionBy('userid').orderBy(F.desc('interaction_timestamp'))

#dense_rank will assign a number to each row according to the defined order
df.withColumn("interaction_date_order", F.dense_rank().over(w)).show()

Выход:

+------+---------------------+----------------------+ 
|userid|interaction_timestamp|interaction_date_order| 
+------+---------------------+----------------------+ 
|     3|           2018-01-03|                     1| 
|     1|           2018-01-04|                     1| 
|     1|           2018-01-03|                     2| 
|     1|           2018-01-02|                     3| 
|     4|           2018-01-03|                     1| 
|     2|           2018-01-03|                     1| 
|     2|           2018-01-02|                     2|
+------+---------------------+----------------------+
person cronoik    schedule 09.10.2019
comment
Фантастика! Спасибо! - person Kyle.; 09.10.2019