Обновление значений переменных в UDF Pyspark

Я хочу иметь функцию udf, которая проходит через столбец «Значения» и проверяет, составляет ли следующее значение 50% или более от текущего значения строки. Если он находится в пределах 50% процентов, я хочу включить значение «да», если нет, то я не хочу включать это значение. Если значение падает слишком быстро между последним значением и следующим значением, его не следует включать, но если оно постепенно падает, а не более чем на 50% по сравнению с последним включенным значением, тогда все в порядке. Вот почему .1 для идентификатора 5 не был включен, но был включен .1 для идентификатора 9, потому что он следовал за значением, которое постепенно снижалось с .4 не более чем на 50%. Я думал о наличии переменной в udf для отслеживания последнего допустимого значения, но я не уверен, как это сделать.

rows = sc.parallelize([[1, .9, 'yes'], [2, .7, 'yes'], [3, .4, 'yes'], [4, .15, 'no'], [5, .1, 'no'], [7, .3, 'yes'], [8, .2, 'yes'], [9, .1, 'yes']])

rows_df = rows.toDF(["ID",  'Values', 'Include'])

#preview data
rows_df.show()

#show data schema
rows_df.printSchema()

+---+------+-------+
| ID|Values|Include|
+---+------+-------+
|  1|   0.9|    yes|
|  2|   0.7|    yes|
|  3|   0.4|    yes|
|  4|  0.15|     no|
|  5|   0.1|     no|
|  7|   0.3|    yes|
|  8|   0.2|    yes|
|  9|   0.1|    yes|
+---+------+-------+

person Maria Nazari    schedule 16.09.2019    source источник


Ответы (1)


Для достижения своей цели вам не обязательно использовать UDF (на самом деле я не думаю, что это возможно), вместо этого вы можете использовать различные функции, которые работают с окном, например lag.

Должен признаться, что не совсем понимаю ваше требование (почему 5. должно быть «нет»?), Но между lag, lead и last вы сможете его реализовать. Вы можете узнать о них больше в документы. Пример, который выполняет логику на основе предыдущего значения:

from pyspark.sql import Window
from pyspark.sql.functions import col, lag, when, lit

windowSpec = Window.orderBy("Id")

withPrevious = rows_df.withColumn("prevVal", lag(rows_df["Values"]).over(windowSpec))

withPrevious.withColumn("Include2", 
                        when(col("prevVal").isNull(), "yes")\
                        .when(col("Values") >= 0.5 * col("prevVal"), lit("yes"))\
                        .otherwise("no"))\
    .show()
+---+------+-------+-------+--------+
| ID|Values|Include|prevVal|Include2|
+---+------+-------+-------+--------+
|  1|   0.9|    yes|   null|     yes|
|  2|   0.7|    yes|    0.9|     yes|
|  3|   0.4|    yes|    0.7|     yes|
|  4|  0.15|     no|    0.4|      no|
|  5|   0.1|     no|   0.15|     yes|
|  7|   0.3|    yes|    0.1|     yes|
|  8|   0.2|    yes|    0.3|     yes|
|  9|   0.1|    yes|    0.2|     yes|
+---+------+-------+-------+--------+
person Daniel    schedule 17.09.2019
comment
Я хочу, чтобы идентификатор строки 5 был нет, но строка 9 была да, я думаю, что мне нужно будет перебирать значения строки с помощью цикла for, если udf невозможно. - person Maria Nazari; 18.09.2019
comment
Не могли бы вы объяснить шаг за шагом, как вы рассчитываете «нет» для 5? - person Daniel; 18.09.2019
comment
при id1 значение 0,9, а id 2 - да, потому что 0,7 меньше 50 процентов, последнее принятое значение для сравнения - 0,7, поэтому id3 - да, последнее принятое значение сейчас - 0,4, а следующие два идентификаторы 4-5 - нет, потому что они уменьшаются более чем на 50% с 0,4, тогда мы переходим к идентификатору 7 с .3, который является «да», и новое принятое значение становится 0,3, а следующий идентификатор также является «да» и новым av становится .2, и поскольку .1 для идентификатора 9 не падает более чем на 50%, здесь принимается - person Maria Nazari; 18.09.2019