Добавить столбец ADT в набор данных Spark?

Я хочу создать набор данных, содержащий столбец ADT. На основе этого вопроса: Кодировать иерархию ADT / запечатанных признаков в столбец Spark DataSet Я знаю, что есть решение, которое кодирует с помощью kryo, но это не очень полезно. Есть другой способ решить проблему, который намного лучше. Определим следующий ADT:

sealed case class Animal(sound: String)
object Cat extends Animal("miau")
object Dog extends Animal("wuff") 

и определите класс case, который использует Animal

case class Pet(name: String, sound: Animal)

Теперь я легко могу создать набор данных из Pet

val ds = List(Pet("Tom", Cat), Pet("Beethoven", Dog)).toDS
ds.show()
+---------+------+
|     name| sound|
+---------+------+
|      Tom|[miau]|
|Beethoven|[wuff]|
+---------+------+

Обратите внимание, что sound - это Struct, но извлечь элемент несложно:

ds.select("name", "sound.*").show()
+---------+-----+
|name     |sound|
+---------+-----+
|Tom      |miau |
|Beethoven|wuff |
+---------+-----+

Собственно, это последняя структура, которой я хочу добиться. Я столкнулся с двумя проблемами.

  1. Обычно не рекомендуется наследовать от класса case
  2. Исчерпывающее сопоставление с образцом запрашивает регистр по умолчанию

Пример проблемы 2:

 def getSound(animal: Animal): String = animal match {
   case Cat => Cat.sound
   case Dog => Dog.sound
   case _ => ""
 }

Чтобы решить проблему 2, я решил создать запечатанный абстрактный класс. Я тоже хочу сделать из него продукт

sealed abstract class Animal(sound: String) extends Product
case object Cat extends Animal("miau")
case object Dog extends Animal("wuff")

Теперь проблема 2 решена, и регистр по умолчанию больше не требуется. Однако я не могу создать набор данных из Animal. У меня следующее исключение: java.lang.RuntimeException: Error while encoding: java.lang.RuntimeException: Couldn't find sound on class Animal

Я действительно хочу добиться того же поведения, что и Option. Мы можем создать класс case, который содержит необязательное поле:

case class Person(name: String, age: Option[Int])
List(Person("Jack", Some(26)), Person("Julia", None)).toDS.show()
+-----+----+
| name| age|
+-----+----+
| Jack|  26|
|Julia|null|
+-----+----+

Я проверил реализацию Option, и это тоже запечатанный абстрактный класс, так что мне не хватает? Как кодируется опция для наборов данных?

ОБНОВЛЕНИЕ

Извините, последняя часть с Option здесь не имеет особого смысла, поскольку там вам нужно явно указать значение, которое вы хотите видеть в конце набора данных.

Но остается вопрос, как я могу кодировать столбец, созданный из ADT, с правильным сопоставлением с образцом.




Ответы (2)


Мне не хватало метода apply для моего класса Animal.

sealed abstract class Animal(val sound: String) extends Product with Serializable
  case object Cat extends Animal(sound = "miau")
  case object Dog extends Animal(sound = "wuff")
  object Animal {
    def apply(animal: Animal): String = animal match {
      case Cat => Cat.sound
      case Dog => Dog.sound
    }
  }

Используя это, я могу получить почти желаемый результат:

val ds = List(Pet("Tom", Cat), Pet("Beethoven", Dog)).toDS
ds.show()
+---------+------+
|     name| sound|
+---------+------+
|      Tom|[miau]|
|Beethoven|[wuff]|
+---------+------+
person sanyi14ka    schedule 18.12.2019
comment
Это никогда не будет работать в кластере из-за абстрактного класса. Выкинет RTE org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 44, Column 11: Cannot instantiate abstract "Animal" - person Atais; 02.07.2020
comment
Интересно, что я пробовал это в Spark Shell (версия 2.4.0-cdh6.3.1, клиентский режим) и сгенерировал набор данных с желаемой схемой. Я тоже попробую запустить в кластерном режиме, используя банку - person sanyi14ka; 12.08.2020

sanyi14ka фактически никогда не будет работать.

Тема кодировщиков ADT / Enum в DataSet не нова, но даже сегодня она не работает должным образом.

Возможно, вам пригодятся эти две ссылки:

person Atais    schedule 11.08.2020