Как разбить столбец с несколькими значениями на отдельные строки с помощью типизированного набора данных?

Я столкнулся с проблемой, как разбить столбец с несколькими значениями, то есть List[String], на отдельные строки.

Исходный набор данных имеет следующие типы: Dataset[(Integer, String, Double, scala.List[String])]

+---+--------------------+-------+--------------------+
| id|       text         | value |    properties      |
+---+--------------------+-------+--------------------+
|  0|Lorem ipsum dolor...|    1.0|[prp1, prp2, prp3..]|
|  1|Lorem ipsum dolor...|    2.0|[prp4, prp5, prp6..]|
|  2|Lorem ipsum dolor...|    3.0|[prp7, prp8, prp9..]|

Результирующий набор данных должен иметь следующие типы:

Dataset[(Integer, String, Double, String)]

и properties следует разделить так, чтобы:

+---+--------------------+-------+--------------------+
| id|       text         | value |    property        |
+---+--------------------+-------+--------------------+
|  0|Lorem ipsum dolor...|    1.0|        prp1        |
|  0|Lorem ipsum dolor...|    1.0|        prp2        |
|  0|Lorem ipsum dolor...|    1.0|        prp3        |
|  1|Lorem ipsum dolor...|    2.0|        prp4        |
|  1|Lorem ipsum dolor...|    2.0|        prp5        |
|  1|Lorem ipsum dolor...|    2.0|        prp6        |

person Adam    schedule 21.04.2017    source источник


Ответы (3)


Часто предлагается explode, но он из нетипизированного API DataFrame и, учитывая, что вы используете Dataset, я думаю, что оператор flatMap может быть более подходящим (см. org.apache.spark.sql.Dataset).

flatMap[U](func: (T) ⇒ TraversableOnce[U])(implicit arg0: Encoder[U]): Dataset[U]

(Специально для Scala) Возвращает новый набор данных, сначала применяя функцию ко всем элементам этого набора данных, а затем сглаживая результаты.

Вы можете использовать его следующим образом:

val ds = Seq(
  (0, "Lorem ipsum dolor", 1.0, Array("prp1", "prp2", "prp3")))
  .toDF("id", "text", "value", "properties")
  .as[(Integer, String, Double, scala.List[String])]

scala> ds.flatMap { t => 
  t._4.map { prp => 
    (t._1, t._2, t._3, prp) }}.show
+---+-----------------+---+----+
| _1|               _2| _3|  _4|
+---+-----------------+---+----+
|  0|Lorem ipsum dolor|1.0|prp1|
|  0|Lorem ipsum dolor|1.0|prp2|
|  0|Lorem ipsum dolor|1.0|prp3|
+---+-----------------+---+----+

// or just using for-comprehension
for {
  t <- ds
  prp <- t._4
} yield (t._1, t._2, t._3, prp)
person Jacek Laskowski    schedule 21.04.2017
comment
Вы также можете использовать DataFrame, но, конечно, лучше с DataSet. Я позаимствовал ваш пример и изменил его на DataFrames stackoverflow.com/questions/40397740/ - person ruloweb; 27.02.2019

Вы можете использовать explode:

df.withColumn("property", explode($"property"))

Пример:

val df = Seq((1, List("a", "b"))).toDF("A", "B")   
// df: org.apache.spark.sql.DataFrame = [A: int, B: array<string>]

df.withColumn("B", explode($"B")).show
+---+---+
|  A|  B|
+---+---+
|  1|  a|
|  1|  b|
+---+---+
person Psidom    schedule 21.04.2017

Вот один из способов сделать это:

val myRDD = sc.parallelize(Array(
  (0, "text0", 1.0, List("prp1", "prp2", "prp3")),
  (1, "text1", 2.0, List("prp4", "prp5", "prp6")),
  (2, "text2", 3.0, List("prp7", "prp8", "prp9"))
)).map{
  case (i, t, v, ps) => ((i, t, v), ps)
}.flatMapValues(x => x).map{
  case ((i, t, v), p) => (i, t, v, p)
}
person Leo C    schedule 21.04.2017
comment
О нет. Это RDD API? Зачем людям это нужно в эпоху набора данных? - person Jacek Laskowski; 22.04.2017
comment
Я думаю, что и RDD, и DataSet имеют свое место, хотя в этом случае я согласен, что прямая работа с DataSet - лучший подход. - person Leo C; 22.04.2017