Программно сгенерируйте схему И данные для фрейма данных в Apache Spark.

Я хотел бы динамически генерировать кадр данных, содержащий запись заголовка для отчета, поэтому создавая кадр данных из значения строки ниже:

val headerDescs : String = "Name,Age,Location"

val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))

Однако теперь я хочу сделать то же самое для данных (которые фактически являются теми же данными, то есть метаданными).

Я создаю RDD:

val headerRDD = sc.parallelize(headerDescs.split(","))

Затем я намеревался использовать createDataFrame для его создания:

val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)

однако это не удается, потому что createDataframe ожидает RDD[Row], однако мой RDD представляет собой массив строк - я не могу найти способ преобразовать свой RDD в RDD Row, а затем динамически отображать поля. Примеры, которые я видел, предполагают, что вы заранее знаете количество столбцов, однако я хочу, чтобы в конечном итоге можно было изменять столбцы без изменения кода - например, имея столбцы в файле.

Фрагмент кода, основанный на первом ответе:

val headerDescs : String = "Name,Age,Location"

// create the schema from a string, splitting by delimiter
val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))

// create a row from a string, splitting by delimiter
val headerRDDRows = sc.parallelize(headerDescs.split(",")).map( a => Row(a))

val headerDf = sqlContext.createDataFrame(headerRDDRows, headerSchema)
headerDf.show()

Выполнение этого приводит к:

+--------+---+--------+

|    Name|Age|Location|

+--------+---+--------+

|    Name|

|     Age|

|Location|

+--------+---+-------

person Jon Robinson    schedule 19.01.2017    source источник


Ответы (1)


Для преобразования RDD[Array[String]] в RDD[Row] вам необходимо выполнить следующие шаги:

import org.apache.spark.sql.Row

val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))

scala> val headerSchema = StructType(headerDescs.split(",").map(fieldName => StructField(fieldName, StringType, true)))
headerSchema: org.apache.spark.sql.types.StructType = StructType(StructField(Name,StringType,true), StructField(Age,StringType,true), StructField(Location,StringType,true))

scala> val headerRDD = sc.parallelize(Seq(headerDescs.split(","))).map(x=>Row(x(0),x(1),x(2)))
headerRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[6] at map at <console>:34

scala> val headerDf = sqlContext.createDataFrame(headerRDD, headerSchema)
headerDf: org.apache.spark.sql.DataFrame = [Name: string, Age: string, Location: string]


scala> headerDf.printSchema
root
 |-- Name: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Location: string (nullable = true)



scala> headerDf.show
+----+---+--------+
|Name|Age|Location|
+----+---+--------+
|Name|Age|Location|
+----+---+--------+

Это даст вам RDD[Row]

Для чтения через файл

val vRDD = sc.textFile("..**filepath**.").map(_.split(",")).map(a => Row.fromSeq(a))
 
val headerDf = sqlContext.createDataFrame(vRDD , headerSchema)

Используя пакет Spark-CSV:

 val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .schema(headerSchema) // defining based on the custom schema
    .load("cars.csv")

ИЛИ

val df = sqlContext.read
    .format("com.databricks.spark.csv")
    .option("header", "true") // Use first line of all files as header
    .option("inferSchema", "true") // Automatically infer data types
    .load("cars.csv")

Существуют различные варианты, которые вы можете изучить в его документации.

person Rajat Mishra    schedule 19.01.2017
comment
Спасибо за быстрый ответ, но я получаю сообщение об ошибке: 45: ошибка: разделение значений не является членом Char - person Jon Robinson; 19.01.2017
comment
@JonRobinson обновил ответ. Это должно работать - person Rajat Mishra; 19.01.2017
comment
Спасибо, это ближе, но все значения сопоставлены только с первым столбцом фрейма данных, т. Е. Все значения находятся в столбце «Имя» - я хочу «Имя» в столбце «Имя», «Возраст» в столбце «Возраст» и т. Д. - person Jon Robinson; 19.01.2017
comment
@JonRobinson, можете ли вы показать результат, который вы получаете, и код. - person Rajat Mishra; 19.01.2017
comment
@JonRobinson, потому что, если вы видите, даже схема показывает 3 столбца. - person Rajat Mishra; 19.01.2017
comment
Схема в порядке, проблема в том, что данные отображаются только в первом столбце, то есть «Имя», «Возраст», «Местоположение» отображаются в столбце «Имя». - person Jon Robinson; 23.01.2017
comment
пожалуйста, из вашего кажется, что вы создаете пустой фрейм данных. пожалуйста, опубликуйте код, где вы вставляете данные в фрейм данных. - person Rajat Mishra; 23.01.2017
comment
Обновлен исходный вопрос с кодом и выводом. - person Jon Robinson; 23.01.2017
comment
Спасибо за предложение, но я уже отказался от этого подхода - опять же, проблема в том, что вам нужно заранее знать, сколько столбцов в выводе, т. Е. Это не программно. Я хотел, чтобы, если вы обновите headerDescs, например. headerDescs = Name, Age, Location, Email, вам не придется менять другой код. - person Jon Robinson; 24.01.2017
comment
@JonRobinson, откуда вы читаете данные. Я имею в виду, это из файла или таблицы? - person Rajat Mishra; 24.01.2017
comment
@JonRobinson, если вы читаете из файла и вам нужно преобразовать его в фрейм данных, я бы посоветовал вам использовать пакет spark-csv. github.com/databricks/spark-csv - person Rajat Mishra; 24.01.2017
comment
@JonRobinson обновил ответ, чтобы создать кадр данных из файла. - person Rajat Mishra; 24.01.2017
comment
Мне нравится ваш ответ для чтения из файла, это то, что я ищу. Спасибо за помощь, Раджат. - person Jon Robinson; 25.01.2017