Spark: java.lang.UnsupportedOperationException: кодировщик для java.time.LocalDate не найден

Я пишу приложение Spark, используя версию 2.1.1. Следующий код получил ошибку при вызове метода с параметром LocalDate?

Exception in thread "main" java.lang.UnsupportedOperationException: No Encoder found for java.time.LocalDate
- field (class: "java.time.LocalDate", name: "_2")
- root class: "scala.Tuple2"
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:602)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:596)
        at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$9.apply(ScalaReflection.scala:587)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at scala.collection.immutable.List.flatMap(List.scala:344)
        at org.apache.spark.sql.catalyst.ScalaReflection$.org$apache$spark$sql$catalyst$ScalaReflection$$serializerFor(ScalaReflection.scala:587)
....
val date : LocalDate = ....
val conf = new SparkConf()
val sc = new SparkContext(conf.setAppName("Test").setMaster("local[*]"))
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val itemListJob = new ItemList(sqlContext, jdbcSqlConn)
import sqlContext.implicits._ 
val processed = itemListJob.run(rc, priority).select("id").map(d => {
  runJob.run(d, date) 
})

class ItemList(sqlContext: org.apache.spark.sql.SQLContext, jdbcSqlConn: String) {
  def run(date: LocalDate) = {
    import sqlContext.implicits._ 
    sqlContext.read.format("jdbc").options(Map(
      "driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
      "url" -> jdbcSqlConn,
      "dbtable" -> s"dbo.GetList('$date')"
    )).load()
    .select("id") 
    .as[Int] 
  }
}

Обновление: я изменил тип возвращаемого значения runJob.run() на кортеж (int, java.sql.Date) и изменил код лямбды .map(...) на

val processed = itemListJob.run(rc, priority).select("id").map(d => {
  val (a,b) = runJob.run(d, date) 
  $"$a, $b"
})

Теперь ошибка изменилась на

[error] C:\....\scala\main.scala:40: Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases. 
[error]     val processed = itemListJob.run(rc, priority).map(d => { 
[error]                                                      ^ 
[error] one error found 
[error] (compile:compileIncremental) Compilation failed

person ca9163d9    schedule 24.05.2017    source источник
comment
Добавьте версию Spark и используемую сериализацию (если вы измените значение по умолчанию).   -  person Zernike    schedule 25.05.2017
comment
Моя искровая версия - 2.1.1 на моем локальном компьютере для разработки. Я ничего не менял в сериализации (настройка по умолчанию).   -  person ca9163d9    schedule 25.05.2017
comment
Измените runJob.run(d, date), чтобы он возвращал некоторый класс, который понимает Spark SQL, например java.util.Date.   -  person zsxwing    schedule 25.05.2017
comment
@zsxwing Спасибо, я изменил код, как вы предложили. Однако теперь у него появилась новая ошибка. Я попытался добавить import sqlContext.implicits._ в лямбду, переданную функции map(), но это не помогло.   -  person ca9163d9    schedule 25.05.2017
comment
Оператор import не следует добавлять внутри лямбда, потому что он будет использоваться map. Просто добавьте его над этой строкой val processed = itemListJob.run(rc, priority).map(d => {.   -  person zsxwing    schedule 25.05.2017
comment
Да, это уже было import sqlContext.implicits._ прямо перед строкой val processed = itemListJob.run(rc, priority).map(d => {. Но все равно возникает ошибка?   -  person ca9163d9    schedule 25.05.2017
comment
@zsxwing, неважно. Ошибка вызвана тем, что я написал $"..." вместо s"....".   -  person ca9163d9    schedule 25.05.2017
comment
@zsxwing, код теперь работает успешно. Теперь у меня вопрос о параллелизме. Вопрос в том, stackoverflow.com/questions/44169588 /. Вы можете это прокомментировать? Большое спасибо!   -  person ca9163d9    schedule 25.05.2017


Ответы (1)


для настраиваемого типа набора данных вы можете использовать структуру Kyro serde, если ваши данные действительно сериализуемы (также известные как Serializable). вот один пример использования Kyro: Spark не найден кодировщик для java.io.Serializable в Map [String, java.io.Serializable].

Kyro всегда рекомендуется, так как он намного быстрее и совместим с Java-фреймворком serde. вы определенно можете выбрать собственный serde Java (ObjectWriter / ObjectReader), но он намного медленнее.

как и в комментариях выше, SparkSQL поставляется с множеством полезных кодировщиков под sqlContext.implicits._, но они не охватывают все, поэтому вам, возможно, придется подключить свой собственный кодировщик.

Как я уже сказал, ваши пользовательские данные должны быть сериализуемыми, и согласно https://docs.oracle.com/javase/8/docs/api/java/time/LocalDate.html, он реализует интерфейс Serializable, так что вам здесь определенно хорошо.

person linehrr    schedule 18.11.2018