DataFrame в массив Jsons

У меня есть кадр данных, как показано ниже

+-------------+-------------+-------------+
| columnName1 | columnName2 | columnName3 |
+-------------+-------------+-------------+
| 001         | 002         | 003         |
+-------------+-------------+-------------+
| 004         | 005         | 006         |
+-------------+-------------+-------------+

Я хочу преобразовать в JSON, как и ожидалось, в формате ниже.

ОЖИДАЕМЫЙ ФОРМАТ

[[{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key":"columnName1","value":"003"}],[{"key":"columnName1","value":"004"},{"key":"columnName2","value":"005"},{"key":"columnName1","value":"006"}]]

Заранее спасибо

Я пробовал это с playjson api

val ColumnsNames: Seq[String] = DF.columns.toSeq
    val result= DF
      .limit(recordLimit)
      .map { row =>
        val kv: Map[String, String] = row.getValuesMap[String](allColumns)
        kv.map { x =>
          Json
            .toJson(
              List(
                ("key"   -> x._1),
                ("value" -> x._2)
              ).toMap
            )
            .toString()
        }.mkString("[", ", ", "]")
      }
      .take(10)

Теперь он идет в таком формате:

["[{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key":"columnName1","value":"003"}]","[{"key":"columnName1","value":"004"},{"key":"columnName2","value":"005"},{"key":"columnName1","value":"006"}]"]

Но мне нужно в этом ожидаемом формате с playjson с кодировщиками

[[{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key":"columnName1","value":"003"}],[{"key":"columnName1","value":"004"},{"key":"columnName2","value":"005"},{"key":"columnName1","value":"006"}]]

столкнулся с этой проблемой

Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
[error]       .map { row =>

В основном преобразование Array[String] в Array[Array[Jsvalue]]


person Learnis    schedule 13.09.2019    source источник


Ответы (3)


Вышеупомянутое исключение возникает, потому что Spark не имеет кодировщика для Jsvalue для десериализации/сериализации DF. Проверьте пользовательский кодировщик Spark. Однако вместо возврата JSValue JS.toString может быть возвращен внутри операции карты DF.

Одним из подходов может быть:

  1. Преобразование строки DF в совместимый JSON — [{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key": "имя_столбца1","значение":"003"}]
  2. Соберите DF как массив/список mkstring, используя разделитель
  3. Заключенная выше строка внутри "[]"

Предупреждение ниже: код использует Collect, это может задушить драйвер Spark

//CSV
c1,c2,c3
001,002,003
004,005,006

//Code
val df = spark.read.option("header", "true").csv("array_json.csv")
val allColumns = df.schema.map(s => s.name)
//Import Spark implicits Encoder
import spark.implicits._ 
val sdf = df.map(row => {
  val kv = row.getValuesMap[String](allColumns)
  Json.toJson(kv.map(x => {
    List(
      "key" -> x._1,
      "value" -> x._2
    ).toMap
  })).toString()
})

val dfString = sdf.collect().mkString(",")
val op = s"[$dfString]"
println(op)

Выход:

[[{"key":"c1","value":"001"},{"key":"c2","value":"002"},{"key":"c3","value":"003"}],[{"key":"c1","value":"004"},{"key":"c2","value":"005"},{"key":"c3","value":"006"}]]
person hagarwal    schedule 14.09.2019
comment
И мой код, и ваш код дают одинаковый результат, и мой код более надежен, когда мы работаем в BigDataWorld. В любом случае, спасибо, хорошая попытка. - person Learnis; 14.09.2019

Другой подход без RDD:

  import spark.implicits._
  val df = List((1, 2, 3), (11, 12, 13), (21, 22, 23)).toDF("A", "B", "C")

  val fKeyValue = (name: String) => 
                   struct(lit(name).as("key"), col(name).as("value"))

  val lstCol = df.columns.foldLeft(List[Column]())((a, b) => fKeyValue(b) :: a)

  val dsJson =
     df
        .select(collect_list(array(lstCol: _*)).as("obj"))
        .toJSON

  import play.api.libs.json._
  val json: JsValue = Json.parse(dsJson.first())
  val arrJson = json \ "obj"
  println(arrJson)
person Gabri    schedule 14.09.2019

    val ColumnsNames: Seq[String] = DF.columns.toSeq
        val result= Json.parse(DF
          .limit(recordLimit)
          .map { row =>
            val kv: Map[String, String] = row.getValuesMap[String](allColumns)
            kv.map { x =>
              Json
                .toJson(
                  List(
                    ("key"   -> x._1),
                    ("value" -> x._2)
                  ).toMap
                )
                .toString()
            }.mkString("[", ", ", "]")
          }
          .take(10).mkstring("[", ", ", "]"))

дает


    [[{"key":"columnName1","value":"001"},{"key":"columnName2","value":"002"},{"key":"columnName1","value":"003"}],[{"key":"columnName1","value":"004"},{"key":"columnName2","value":"005"},{"key":"columnName1","value":"006"}]]

person Learnis    schedule 14.09.2019