Spark Dataframe/набор данных: общая условная кумулятивная сумма

У меня есть кадр данных, который имеет несколько атрибутов (от C1 до C2), смещение (в днях) и несколько значений (V1, V2).

val inputDF= spark.sparkContext.parallelize(Seq((1,2,30, 100, -1),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10).toDF("c1", "c2", "v1", "v2", "offset")
inputDF: org.apache.spark.sql.DataFrame = [c1: int, c2: int ... 3 more fields]

scala> inputDF.show
+---+---+---+---+------+
| c1| c2| v1| v2|offset|
+---+---+---+---+------+
|  1|  2| 30|100|    -1|
|  1|  2| 30|100|     0|
|  1|  2| 30|100|     1|
| 11| 21| 30|100|    -1|
| 11| 21| 30|100|     0|
| 11| 21| 30|100|     1|
+---+---+---+---+------+

Что мне нужно сделать, так это рассчитать совокупную сумму для V1, V2 для (c1,c2) ​​по смещению.

Я пробовал это, но это далеко от общего решения, которое могло бы работать с любым фреймом данных.

import org.apache.spark.sql.expressions.Window

val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)

val outputDF = inputDF
  .withColumn("cumulative_v1", sum(inputDF("v1")).over(w))
  .withColumn("cumulative_v2", sum(inputDF("v2")).over(w))

+---+---+---+---+------+----------------------------
| c1| c2| v1| v2|offset|cumulative_v1| cumulative_v2|
+---+---+---+---+------+-------------|--------------|
|  1|  2| 30|100|    -1|30           | 100          |
|  1|  2| 30|100|     0|60           | 200          |
|  1|  2| 30|100|     1|90           | 300          |
| 11| 21| 30|100|    -1|30           | 100          |
| 11| 21| 30|100|     0|60           | 200          |
| 11| 21| 30|100|     1|90           | 300          |
+---+---+---+---+------+-----------------------------

Проблема заключается в следующем: [a] мне нужно сделать это в нескольких окнах с переменным смещением (от -1 до 1), (от -10 до 10), (от -30 до 30) или любых других [b] мне нужно использовать эту функцию во всех окнах. несколько фреймов данных/наборов данных, поэтому я надеюсь на общую функцию, которая могла бы работать в RDD/наборе данных.

Любые мысли о том, как я могу добиться этого в Spark 2.0?

Помощь очень ценится. Спасибо!


person Yash    schedule 18.02.2017    source источник
comment
Добро пожаловать в Stack Overflow! Мы сайт вопросов и ответов, а не служба кодеров по найму. Пожалуйста, объясните, что вы пробовали до сих пор и почему это не сработало. См.: Почему кто-нибудь может мне помочь? не актуальный вопрос?   -  person Joe C    schedule 19.02.2017
comment
Спасибо. Я пришел к вышеуказанному набору результатов с моим решением. Добавление сейчас.   -  person Yash    schedule 19.02.2017


Ответы (2)


Вот примитивный пример, использующий только фреймы данных.

import org.apache.spark.sql.expressions.Window

val groupKey = List("c1", "c2").map(x => col(x.trim))
val orderByKey = List("offset").map(x => col(x.trim))

val w = Window.partitionBy(groupKey: _*).orderBy(orderByKey: _*)

val inputDF= spark
  .sparkContext
  .parallelize(Seq((1,2,30, 100, -1),(1,2,3, 100, -2),(1,2,140, 100, 2),(1,2,30, 100, 0), (1,2,30, 100, 1),(11,21,30, 100, -1),(11,21,30, 100, 0), (11,21,30, 100, 1)), 10)
  .toDF("c1", "c2", "v1", "v2", "offset")

val outputDF = inputDF
  .withColumn("cumulative_v1", sum(when($"offset".between(-1, 1), inputDF("v1")).otherwise(0)).over(w))
  .withColumn("cumulative_v3", sum(when($"offset".between(-2, 2), inputDF("v1")).otherwise(0)).over(w))
  .withColumn("cumulative_v2", sum(inputDF("v2")).over(w))

Это дает кумулятивную сумму по одному «значению» для разных окон.

scala> outputDF.show
+---+---+---+---+------+-------------+-------------+-------------+              
| c1| c2| v1| v2|offset|cumulative_v1|cumulative_v3|cumulative_v2|
+---+---+---+---+------+-------------+-------------+-------------+
|  1|  2|  3|100|    -2|            0|            0|          100|
|  1|  2| 30|100|    -1|           30|           30|          200|
|  1|  2| 30|100|     0|           60|           60|          300|
|  1|  2| 30|100|     1|           90|           90|          400|
|  1|  2|140|100|     2|           90|           90|          500|
| 11| 21| 30|100|    -1|           30|           30|          100|
| 11| 21| 30|100|     0|           60|           60|          200|
| 11| 21| 30|100|     1|           90|           90|          300|
+---+---+---+---+------+-------------+-------------+-------------+

Пара недостатков этого подхода - [1] для каждого условного окна (-1,1), (-2,2) или любого (from_offset, to_offset) необходимо вызывать sum() отдельно. [2] это не общая функция.

Я знаю, что spark принимает переменный список столбцов для агрегатных функций, подобных этому:

val exprs = Map("v1" -> "sum", "v2" -> "sum")

Но я не уверен, как расширить это для оконных функций с переменными условиями. Мне все еще очень любопытно узнать, есть ли лучшая модульная/многоразовая функция, которую мы можем написать для решения этой проблемы.

person Yash    schedule 20.02.2017

Другой общий способ решить эту проблему - с помощью foldLeft, как описано здесь - https://stackoverflow.com/a/44532867/7059145

person Yash    schedule 14.06.2017