Spark не читает столбцы с нулевыми значениями в первой строке

Ниже приведено содержимое моего CSV-файла:

A1,B1,C1
A2,B2,C2,D1
A3,B3,C3,D2,E1
A4,B4,C4,D3
A5,B5,C5,,E2

Итак, есть 5 столбцов, но только 3 значения в первой строке.

Я прочитал его с помощью следующей команды:

val csvDF : DataFrame = spark.read
.option("header", "false")
.option("delimiter", ",")
.option("inferSchema", "false")
.csv("file.csv") 

И вот что я получаю, используя csvDF.show()

+---+---+---+
|_c0|_c1|_c2|
+---+---+---+
| A1| B1| C1|
| A2| B2| C2|
| A3| B3| C3|
| A4| B4| C4|
| A5| B5| C5|
+---+---+---+

Как я могу прочитать все данные во всех столбцах?


person Sorabh Kumar    schedule 10.08.2017    source источник
comment
можно ли добавить все 5 столбцов в каждую строку? Например, для строки 1 вместо A1, B1, C1 это будет A1, B1, C1,,   -  person Tom    schedule 10.08.2017
comment
Это всего лишь обходной путь, и он не будет работать, если csv управляется кем-то другим.   -  person Sorabh Kumar    schedule 10.08.2017
comment
Просто укажите схему вручную   -  person Zhang Tong    schedule 10.08.2017
comment
что, если мы не знаем схему, содержимое CSV не известно заранее.   -  person Sorabh Kumar    schedule 10.08.2017
comment
Весь контент в csv может быть указан как StringType   -  person Zhang Tong    schedule 10.08.2017
comment
Это даже не правильный csv. У вас нет разделителей (запятых) для двух других столбцов в первой строке. Вы можете попробовать создать пользовательскую схему, загрузить файл с помощью sc.textFile и проверить, имеет ли каждая строка количество столбцов, равное тому, что у вас есть в схеме.   -  person philantrovert    schedule 10.08.2017


Ответы (3)


По сути, ваш CSV-файл неправильно отформатирован в том смысле, что он не имеет одинакового количества столбцов в каждой строке, что требуется, если вы хотите прочитать его с помощью spark.read.csv. Однако вместо этого вы можете прочитать его с помощью spark.read.textFile, а затем проанализировать каждую строку.

Насколько я понимаю, вы заранее не знаете количество столбцов, поэтому хотите, чтобы ваш код обрабатывал произвольное количество столбцов. Для этого вам необходимо установить максимальное количество столбцов в вашем наборе данных, поэтому вам потребуется два прохода по вашему набору данных.

Для этой конкретной проблемы я бы использовал RDD вместо DataFrames или Datasets, например:

val data  = spark.read.textFile("file.csv").rdd

val rdd = data.map(s => (s, s.split(",").length)).cache
val maxColumns = rdd.map(_._2).max()

val x = rdd
  .map(row => {
    val rowData = row._1.split(",")
    val extraColumns = Array.ofDim[String](maxColumns - rowData.length)
    Row((rowData ++ extraColumns).toList:_*)
  })

Надеюсь, это поможет :)

person Glennie Helles Sindholt    schedule 10.08.2017

Вы можете прочитать его как набор данных только с одним столбцом (например, используя другой разделитель):

var df = spark.read.format("csv").option("delimiter",";").load("test.csv")
df.show()

+--------------+
|           _c0|
+--------------+
|      A1,B1,C1|
|   A2,B2,C2,D1|
|A3,B3,C3,D2,E1|
|   A4,B4,C4,D3|
|  A5,B5,C5,,E2|
+--------------+

Затем вы можете использовать этот ответ, чтобы вручную разделить столбец на пять, это добавит значения null когда элемент не существует:

var csvDF = df.withColumn("_tmp",split($"_c0",",")).select(
    $"_tmp".getItem(0).as("col1"),
    $"_tmp".getItem(1).as("col2"),
    $"_tmp".getItem(2).as("col3"),
    $"_tmp".getItem(3).as("col4"),
    $"_tmp".getItem(4).as("col5")
)
csvDF.show()

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|  A1|  B1|  C1|null|null|
|  A2|  B2|  C2|  D1|null|
|  A3|  B3|  C3|  D2|  E1|
|  A4|  B4|  C4|  D3|null|
|  A5|  B5|  C5|    |  E2|
+----+----+----+----+----+
person Fabich    schedule 10.08.2017

Если столбец dataTypes и количество столбцов известны, вы можете определить schema и применить schema при чтении файла csv как dataframe. Ниже я определил все пять столбцов как stringType

val schema = StructType(Seq(
  StructField("col1", StringType, true),
  StructField("col2", StringType, true),
  StructField("col3", StringType, true),
  StructField("col4", StringType, true),
  StructField("col5", StringType, true)))

val csvDF : DataFrame = sqlContext.read
  .option("header", "false")
  .option("delimiter", ",")
  .option("inferSchema", "false")
  .schema(schema)
  .csv("file.csv")

Вы должны получать dataframe как

+----+----+----+----+----+
|col1|col2|col3|col4|col5|
+----+----+----+----+----+
|A1  |B1  |C1  |null|null|
|A2  |B2  |C2  |D1  |null|
|A3  |B3  |C3  |D2  |E1  |
|A4  |B4  |C4  |D3  |null|
|A5  |B5  |C5  |null|E2  |
+----+----+----+----+----+
person Ramesh Maharjan    schedule 10.08.2017