Почему чтение из CSV завершается с ошибкой NumberFormatException?

Я использую Spark 1.6.0 и Scala 2.10.5.

$ spark-shell --packages com.databricks:spark-csv_2.10:1.5.0

import org.apache.spark.sql.SQLContext   
import sqlContext.implicits._    
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}

val bankSchema = StructType(Array(
  StructField("age", IntegerType, true),
  StructField("job", StringType, true),
  StructField("marital", StringType, true),
  StructField("education", StringType, true),
  StructField("default", StringType, true),
  StructField("balance", IntegerType, true),
  StructField("housing", StringType, true),
  StructField("loan", StringType, true),
  StructField("contact", StringType, true),
  StructField("day", IntegerType, true),
  StructField("month", StringType, true),
  StructField("duration", IntegerType, true),
  StructField("campaign", IntegerType, true),
  StructField("pdays", IntegerType, true),
  StructField("previous", IntegerType, true),
  StructField("poutcome", StringType, true),
  StructField("y", StringType, true)))

val market_details = sqlContext.
  read.
  format("com.databricks.spark.csv").
  option("header", "true").
  schema(bankSchema).
  load("/user/sachnil.2007_gmail/Project1_dataset_bank-full.csv")    
market_details.registerTempTable("phone_table")    
val temp = sqlContext.sql("SELECT * FROM phone_table").show()

Ошибка, которую я получаю:

17/05/14 06:11:42 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) java.lang.NumberFormatException: For input string: "58;"management";"married";"tertiary";"no";2143;"yes";"no";"unknown";5;"may";261;1;-1;0;"unknown";"no"" at 
    java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at 
    java.lang.Integer.parseInt(Integer.java:580) at 
    java.lang.Integer.parseInt(Integer.java:615) at 
    scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) at 
    scala.collection.immutable.StringOps.toInt(StringOps.scala:31) at 
    com.databricks.spark.csv.util.TypeCast$.castTo(TypeCast.scala:61) at 
    com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:121) at 
    com.databricks.spark.csv.CsvRelation$$anonfun$buildScan$2.apply(CsvRelation.scala:108) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at 
    scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:308) at 
    scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at 
    scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) at 
    scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) at scala.collection.AbstractIterator.to(Iterator.scala:1157) at 
    scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) at 
    scala.collection.AbstractIterator.toArray(Iterator.scala:1157) at org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212) at 
    org.apache.spark.sql.execution.SparkPlan$$anonfun$5.apply(SparkPlan.scala:212)

Содержимое CSV выглядит так:

"age";"job";"marital";"education";"default";"balance";"housing";"loan";"contact";"day";"month";"duration";"campaign";"pdays";"previous";"poutcome";"y"
58;"management";"married";"tertiary";"no";2143;"yes";"no";"unknown";5;"may";261;1;-1;0;"unknown";"no"
44;"technician";"single";"secondary";"no";29;"yes";"no";"unknown";5;"may";151;1;-1;0;"unknown";"no"
33;"entrepreneur";"married";"secondary";"no";2;"yes";"yes";"unknown";5;"may";76;1;-1;0;"unknown";"no"
47;"blue-collar";"married";"unknown";"no";1506;"yes";"no";"unknown";5;"may";92;1;-1;0;"unknown";"no"

Как я могу это решить?


person codelover    schedule 14.05.2017    source источник
comment
Я изложил ваш пост в соответствии с ожиданиями SO; он все еще не идеален, не сдается из-за длинной строки кода, но я не знаком с языком, поэтому не хотел его менять. Кроме того, я удаляю вашу просьбу о помощи в конце, потому что это дано и не поощряется здесь; вы на самом деле не задали вопрос. Укажите, в чем вы на самом деле нуждаетесь в помощи и что вы пытались решить до сих пор, и вы можете обнаружить, что получите помощь, которую ищете.   -  person lukkea    schedule 14.05.2017
comment
На самом деле, мне нужно знать, где я ошибаюсь при загрузке CSV во фрейм данных.   -  person codelover    schedule 14.05.2017


Ответы (2)


Spark 1.6.0 настолько устарел, что в наши дни его почти никто не поддерживает (если только он не является частью коммерческой поддержки). Я настоятельно рекомендую обновиться до последней версии 2.1.1, которая дает вам большой выбор.


Позвольте мне начать с этого: в моей пользовательской сборке 2.3.0-SNAPSHOT загрузка CSV-файла просто работает, поэтому я думаю, что вы могли столкнуться с какой-то неподдерживаемой функцией spark- csv в той версии, которую вы используете.

Обратите внимание, что модуль spark-csv был интегрирован в Spark, начиная с версии Spark 2+ (одна из многих причин, по которой вам следует обновить Spark).


Если вы настаиваете на настраиваемой схеме (которую вы могли бы позволить Spark определить самостоятельно при использовании параметра inferSchema), используйте по крайней мере DSL для сокращения нажатий клавиш:

import org.apache.spark.sql.types._

val bankSchema = StructType(
  $"age".int ::
  $"job".string ::
  $"marital".string ::
  $"education".string ::
  $"default".string ::
  $"balance".int ::
  $"housing".string ::
  $"loan".string ::
  $"contact".string ::
  $"day".int ::
  $"month".string ::
  $"duration".int ::
  $"campaign".int ::
  $"pdays".int ::
  $"previous".int ::
  $"poutcome".string ::
  $"y".string ::
  Nil)

scala> println(bankSchema.treeString)
root
 |-- age: integer (nullable = true)
 |-- job: string (nullable = true)
 |-- marital: string (nullable = true)
 |-- education: string (nullable = true)
 |-- default: string (nullable = true)
 |-- balance: integer (nullable = true)
 |-- housing: string (nullable = true)
 |-- loan: string (nullable = true)
 |-- contact: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- duration: integer (nullable = true)
 |-- campaign: integer (nullable = true)
 |-- pdays: integer (nullable = true)
 |-- previous: integer (nullable = true)
 |-- poutcome: string (nullable = true)
 |-- y: string (nullable = true)

Если вы разрабатываете приложения Spark с помощью Scala, я настоятельно рекомендую описывать схему с использованием класса case и использовать кодировщики (это снова Spark 2+).

case class Market(
  age: Int,
  job: String,
  marital: String,
  education: String,
  default: String,
  balance: Int,
  housing: String,
  loan: String,
  contact: String,
  day: Int,
  month: String,
  duration: Int,
  campaign: Int,
  pdays: Int,
  previous: Int,
  poutcome: String,
  y: String)
import org.apache.spark.sql.Encoders
scala> val bankSchema = Encoders.product[Market]
java.lang.UnsupportedOperationException: `default` is a reserved keyword and cannot be used as field name
- root class: "Market"
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:611)
  at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$10.apply(ScalaReflection.scala:609)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
  at scala.collection.immutable.List.foreach(List.scala:381)
  at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
  at scala.collection.immutable.List.flatMap(List.scala:344)
  at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:609)
  at org.apache.spark.sql.catalyst.ScalaReflection$.serializerFor(ScalaReflection.scala:440)
  at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder$.apply(ExpressionEncoder.scala:71)
  at org.apache.spark.sql.Encoders$.product(Encoders.scala:275)
  ... 48 elided

(В данном конкретном случае это невозможно из-за зарезервированного ключевого слова default, которого вы, возможно, также захотите избежать в своей построенной вручную схеме).


После того, как у вас есть чтение схемы, вы не получите ошибок с образцом, который вы включили в вопрос:

val marketDetails = spark.
  read.
  schema(bankSchema).
  option("header", true).
  option("delimiter", ";").
  csv("market_details.csv")

scala> marketDetails.show
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+

Что мне действительно нравится в Spark SQL, так это то, что вы можете придерживаться чистого SQL, если это ваш предпочтительный «язык» в Spark.

val q = """
  CREATE OR REPLACE TEMPORARY VIEW phone_table
  USING csv
  OPTIONS (
    inferSchema true,
    header true,
    delimiter ';',
    path 'market_details.csv')"""

// execute the above query and discard the result
// we're only interested in the side effect of creating a temp view
sql(q).collect

scala> sql("select * from phone_table").show
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
|age|         job|marital|education|default|balance|housing|loan|contact|day|month|duration|campaign|pdays|previous|poutcome|  y|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+
| 58|  management|married| tertiary|     no|   2143|    yes|  no|unknown|  5|  may|     261|       1|   -1|       0| unknown| no|
| 44|  technician| single|secondary|     no|     29|    yes|  no|unknown|  5|  may|     151|       1|   -1|       0| unknown| no|
| 33|entrepreneur|married|secondary|     no|      2|    yes| yes|unknown|  5|  may|      76|       1|   -1|       0| unknown| no|
| 47| blue-collar|married|  unknown|     no|   1506|    yes|  no|unknown|  5|  may|      92|       1|   -1|       0| unknown| no|
+---+------------+-------+---------+-------+-------+-------+----+-------+---+-----+--------+--------+-----+--------+--------+---+

ПОДСКАЗКА: Используйте spark-sql, и вы можете полностью оставить Scala в стороне.

person Jacek Laskowski    schedule 14.05.2017
comment
Я не могу обновить свой Spark из-за ограничений среды. Спасибо за решение. - person codelover; 14.05.2017
comment
Вы будете удивлены, как сильно вы ошибаетесь, и вы можете использовать любую версию Spark, вы хотите (и среда кластера не имеет ничего общего с используемой версией Spark). Это просто sbt-сборка. - person Jacek Laskowski; 14.05.2017
comment
@JacekLaskowski -- Spark 1.6.0 настолько устарел, что в наши дни почти никто не поддерживает его. › CDH поставляется с Spark 1.6.0, и я считаю, что у Cloudera большой рынок. доля поддержки (конечно, хитрость в том, что они перенесли большинство исправлений ошибок и регрессий более поздней версии 1.6.x, притворяясь, что это все еще версия 1.6.0) - person Samson Scharfrichter; 14.05.2017
comment
@SamsonScharfrichter Верно! Они должны поддерживать своих клиентов, поэтому я не удивлен, что они это делают. Дело в том, что Spark 1.6.0 больше не поддерживается исходником, то есть самим проектом Spark. - person Jacek Laskowski; 14.05.2017

Здесь, кажется, есть две проблемы:

  1. Разделитель CSV

В ваших данных CSV используется ; в качестве разделителя вы должны добавить следующее

.option("delimiter", ";")

Для операции чтения, чтобы использовать указание spark использовать правильный разделитель

val market_details = sqlContext.read.format("com.databricks.spark.csv")
.option("header", "true")
.schema(bankSchema)
.option("delimiter", ";")
.load("/user/sachnil.2007_gmail/Project1_dataset_bank-full.csv")    

Подробнее о формате csv spark-csv

разделитель: по умолчанию столбцы разделяются с помощью , но разделитель может быть установлен на любой символ

  1. Входные данные включают кавычки ("")

Ваши входные данные содержат ненужные "
Пожалуйста, удалите " из входного CSV-файла и запустите его снова (пример ввода PSB):

age;job;marital;education;default;balance;housing;loan;contact;day;month;duration;campaign;pdays;previous;poutcome;y
58;management;married;tertiary;no;2143;yes;no;unknown;5;may;261;1;-1;0;unknown;no
44;technician;single;secondary;no;29;yes;no;unknown;5;may;151;1;-1;0;unknown;no
33;entrepreneur;married;secondary;no;2;yes;yes;unknown;5;may;76;1;-1;0;unknown;no
47;blue-collar;married;unknown;no;1506;yes;no;unknown;5;may;92;1;-1;0;unknown;no

Здесь вы можете найти примеры spark-sql-csv

В примере Детские имена используются следующие входные данные CSV (название, за которым следуют образцы, без кавычек):

Year,First Name,County,Sex,Count
2013,GAVIN,ST LAWRENCE,M,9
2013,LEVI,ST LAWRENCE,M,9
2013,LOGAN,NEW YORK,M,44
person Yaron    schedule 14.05.2017
comment
Я пытался использовать .option(delimiter,;), но все равно сталкиваюсь с ошибкой: scala› val temp = sqlContext.sql(SELECT * FROM phone_table).show() 17/05/14 09:33:46 ОШИБКА Исполнитель: Исключение в задача 0.0 на этапе 1.0 (TID 1) java.lang.NumberFormatException: Для входной строки: 58;управление;женат;третичный;нет;2143;да;нет;неизвестно;5;может;261;1;-1;0; неизвестно; нет в java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) в java.lang.Integer.parseInt(Integer.java:580) - person codelover; 14.05.2017
comment
scala› val temp = sqlContext.sql(SELECT * FROM phone_table).show() 17/05/14 09:33:46 ОШИБКА Исполнитель: Исключение в задаче 0.0 на этапе 1.0 (TID 1) java.lang.NumberFormatException: Для ввода строка: 58;управление;женат;третичный;нет;2143;да;нет;неизвестно;5;может;261;1;-1;0;неизвестно;нет в java.lang.NumberFormatException.forInputString(NumberFormatException.java:65 ) в java.lang.Integer.parseInt(Integer.java:580) в java.lang.Integer.parseInt(Integer.java:615) в scala.collection.immutable.StringLike$class.toInt(StringLike.scala:229) - person codelover; 14.05.2017
comment
@Sachinjuneja - обновил мой ответ и добавил еще одну проблему, которую следует исправить (знак кавычки () во входных данных) - person Yaron; 14.05.2017
comment
Огромное спасибо. Сейчас это работает. Я удалил цитату. Я сделал это вручную в файле CSV, а затем загрузил его в фрейм данных. Есть ли возможность сделать это программно? - person codelover; 14.05.2017