Spark создает массив полей с тем же ключом

У меня есть таблица кустов, которая присутствует поверх искрового контекста. Формат таблицы приведен ниже.

| key | param1 | Param 2|
-------------------------
| A   |  A11   | A12    |
| B   |  B11   | B12    |
| A   |  A21   | A22    |

Я хотел создать DataFrame со схемой

val dataSchema = new StructType(
    Array(
    StructField("key", StringType, nullable = true),
    StructField("param", ArrayType(
        StructType( Array(
            StructField("param1", StringType, nullable = true),
            StructField("param2", StringType, nullable = true)
        )), containsNull = true), nullable = true)
    )
)

из таблицы выше

Так что итоговая таблица станет

| key | param                                               |
-------------------------------------------------------------
| A   |  [{param1:A11, param2:A12},{param1:A11, param2:A12}]|
| B   |  [{param1:B11, param2:B12}]                         |

Я загружаю таблицу, используя контекст куста (hiveContext.table («table_name»)), который возвращает фрейм данных.

scala> val df = hiveContext.table("sample")
df: org.apache.spark.sql.DataFrame = [fk: string, param1: string, param2: string]
scala> val dfStruct = df.select($"key", struct($"param1", $"param2").alias("param"))
dfStruct: org.apache.spark.sql.DataFrame = [fk: string, sub: struct<param1:string,param2:string>]
scala> dfStruct.show
+--+----------+
|fk|     param|
+--+----------+
| A| [A11,A12]|
| B| [B11,B12]|
| A| [A21,A22]|
+--+----------+
scala> 

Я пытаюсь использовать фрейм данных для преобразования в таблицу, как указано выше, используя groupBy. Но не в состоянии сделать.


person abilng    schedule 06.01.2017    source источник


Ответы (1)


Я нашел себя.

Ключ использует case class, а не structType

case class Param(param1: String, param2:String)
case class Sample(key: String, param:Array[Param])

val df = hiveContext.table("sample_sub")

val SampleDF = df.select($"fk", $"param1", $"param2")
val SampleDFMap = SampleDF.rdd.groupBy(r => r.getAs[String]("fk"))
val SampleJoinRDD =  SampleDFMap.map(
    r => Sample(r._1.asInstanceOf[String], r._2.map (
        row => Param(row(1).asInstanceOf[String],row(2).asInstanceOf[String])
        ).toArray
    )
)

SampleJoinRDD.toDF.toJSON.collect
// Array({"key":"A","param":[{"param1":"A11","param2":"A12"},{"param1":"A21","param2":"A22"}]}, {"key":"B","param":[{"param1":"B11","param2":"B12"}]})
person abilng    schedule 09.01.2017