Получение NullPointerException с использованием spark-csv с DataFrames

Просматривая spark-csv README, можно найти пример кода Java, подобный этому импортировать org.apache.spark.sql.SQLContext; импортировать org.apache.spark.sql.types.*;

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(
    new StructField("year", IntegerType, true), 
    new StructField("make", StringType, true),
    new StructField("model", StringType, true),
    new StructField("comment", StringType, true),
    new StructField("blank", StringType, true));

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .option("inferSchema", "true")
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");

Он не скомпилировался из коробки, поэтому с некоторыми препирательствами я скомпилировал его, изменив неправильный синтаксис FooType на DataTypes.FooType и передав StructFields как new StructField[]; компилятор запросил четвертый аргумент для metadata в конструкторе StructField, но у меня возникли проблемы с поиском документации о том, что это значит (javadocs описывает его варианты использования, но не то, как решить, что передавать во время построения StructField). Со следующим кодом он теперь работает до тех пор, пока не возникнет какой-либо побочный эффект, такой как collect():

JavaSparkContext sc = new JavaSparkContext(conf);

SQLContext sqlContext = new SQLContext(sc);

// Read features.
System.out.println("Reading features from " + args[0]);
StructType featuresSchema = new StructType(new StructField[] {
    new StructField("case_id", DataTypes.StringType, false, null), 
    new StructField("foo", DataTypes.DoubleType, false, null)
});
DataFrame features = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(featuresSchema)
    .load(args[0]);
for (Row r : features.collect()) {
  System.out.println("Row: " + r);
}

Я получаю следующее исключение:

Exception in thread "main" java.lang.NullPointerException
  at org.apache.spark.sql.catalyst.expressions.AttributeReference.hashCode(namedExpressions.scala:202)
  at scala.runtime.ScalaRunTime$.hash(ScalaRunTime.scala:210)
  at scala.collection.immutable.HashSet.elemHashCode(HashSet.scala:65)
  at scala.collection.immutable.HashSet.computeHash(HashSet.scala:74)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:56)
  at scala.collection.immutable.HashSet.$plus(HashSet.scala:59)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:127)
  at scala.collection.immutable.Set$Set4.$plus(Set.scala:121)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:24)
  at scala.collection.mutable.SetBuilder.$plus$eq(SetBuilder.scala:22)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
  at scala.collection.immutable.HashSet$HashSet1.foreach(HashSet.scala:153)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.immutable.HashSet$HashTrieSet.foreach(HashSet.scala:306)
  at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
  at scala.collection.AbstractSet.scala$collection$SetLike$$super$map(Set.scala:47)
  at scala.collection.SetLike$class.map(SetLike.scala:93)
  at scala.collection.AbstractSet.map(Set.scala:47)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.foreach(AttributeSet.scala:114)
  at scala.collection.TraversableOnce$class.size(TraversableOnce.scala:105)
  at org.apache.spark.sql.catalyst.expressions.AttributeSet.size(AttributeSet.scala:56)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:307)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:282)
  at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:56)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$1.apply(QueryPlanner.scala:58)
  at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
  at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:59)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan$lzycompute(SQLContext.scala:926)
  at org.apache.spark.sql.SQLContext$QueryExecution.sparkPlan(SQLContext.scala:924)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan$lzycompute(SQLContext.scala:930)
  at org.apache.spark.sql.SQLContext$QueryExecution.executedPlan(SQLContext.scala:930)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:53)
  at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903)
  at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384)
...

Есть идеи, что случилось?


person Dennis Huo    schedule 21.12.2015    source источник


Ответы (2)


Кажется, что README очень устарел и нуждается в значительном редактировании примера Java. Я отследил фактическую JIRA, которая добавила поле метаданных, и это указывает на использование значения по умолчанию Map.empty для случаев Scala, и кто бы ни писал документацию, должно быть, только что перевел Scala непосредственно на Java, несмотря на отсутствие того же значения по умолчанию для входного параметра.

В ветвь 1.5 кода SparkSQL мы видим, что она ссылается на metadata.hashCode() без проверки, что и вызывает ошибку NullPointerException. Существование Metadata.empty() в сочетании с обсуждением использования пустых карт по умолчанию в Scala, по-видимому, подразумевает, что правильная реализация должна идти вперед и передавать Metadata.empty(), если вам это не нужно. Пересмотренный пример должен быть:

SQLContext sqlContext = new SQLContext(sc);
StructType customSchema = new StructType(new StructField[] {
    new StructField("year", DataTypes.IntegerType, true, Metadata.empty()), 
    new StructField("make", DataTypes.StringType, true, Metadata.empty()),
    new StructField("model", DataTypes.StringType, true, Metadata.empty()),
    new StructField("comment", DataTypes.StringType, true, Metadata.empty()),
    new StructField("blank", DataTypes.StringType, true, Metadata.empty())
});

DataFrame df = sqlContext.read()
    .format("com.databricks.spark.csv")
    .schema(customSchema)
    .option("header", "true")
    .load("cars.csv");

df.select("year", "model").write()
    .format("com.databricks.spark.csv")
    .option("header", "true")
    .save("newcars.csv");
person Dennis Huo    schedule 21.12.2015
comment
Отправлен запрос на получение в spark-csv для исправления документации. - person Dennis Huo; 21.12.2015
comment
Я думаю, вам нужно изменить свой ответ с .option(inferSchema, true) на .option(customSchema, true) - person Zahiro Mor; 10.01.2016
comment
Ах, действительно, вы правы, хороший улов, я обращал внимание только на строки, определяющие определение customSchema, когда пытался внести минимальные изменения в исходный пример. Отправил еще один запрос на вытягивание, чтобы исправить это в документации, и отредактировал ответ, чтобы он соответствовал . - person Dennis Huo; 11.01.2016

Даже я получаю такое же исключение. Я исправил это, предоставив метаданные.

поэтому измените код, как

StructType customSchema = new StructType(
new StructField("year", IntegerType, true,Metadata.empty()), 
new StructField("make", StringType, true,Metadata.empty()),
new StructField("model", StringType, true,Metadata.empty()),
new StructField("comment", StringType, true,Metadata.empty()),
new StructField("blank", StringType, true,Metadata.empty()));

это решит проблему

person Abhishyam    schedule 24.09.2017