Я пишу приложение Spark, используя версию 2.1.1. Следующий код получил ошибку при вызове метода с параметром LocalDate?
Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate - field (class: "java.time.LocalDate", name: "_2") - root class: "scala.Tuple2" at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596) at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) at scala.collection.immutable.List.flatMap(List.scala:344) at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587) ....
val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._
val processed = itemListJob.run(rc, priority).select("id").map(d => {
runJob.run(d, date)
})
class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
def run(date: LocalDate) = {
import sqlContext.implicits._
sqlContext.read.format("jdbc").options(Map(
"driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
"url" -> jdbcSqlConn,
"dbtable" -> s"dbo.GetList('$date')"
)).load()
.select("id")
.as[Int]
}
}
Обновление: я изменил тип возвращаемого значения runJob.run()
на кортеж (int, java.sql.Date)
и изменил код лямбды .map(...)
на
val processed = itemListJob.run(rc, priority).select("id").map(d => {
val (a,b) = runJob.run(d, date)
$"$a, $b"
})
Теперь ошибка изменилась на
[error] C:\....\scala\main.scala:40: Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases. [error] val processed = itemListJob.run(rc, priority).map(d => { [error] ^ [error] one error found [error] (compile:compileIncremental) Compilation failed
runJob.run(d, date)
, чтобы он возвращал некоторый класс, который понимает Spark SQL, напримерjava.util.Date
. - person zsxwing   schedule 25.05.2017import sqlContext.implicits._
в лямбду, переданную функцииmap()
, но это не помогло. - person ca9163d9   schedule 25.05.2017import
не следует добавлять внутри лямбда, потому что он будет использоватьсяmap
. Просто добавьте его над этой строкойval processed = itemListJob.run(rc, priority).map(d => {
. - person zsxwing   schedule 25.05.2017import sqlContext.implicits._
прямо перед строкойval processed = itemListJob.run(rc, priority).map(d => {
. Но все равно возникает ошибка? - person ca9163d9   schedule 25.05.2017$"..."
вместоs"...."
. - person ca9163d9   schedule 25.05.2017