Как отладить сериализуемое исключение во Flink?

Я столкнулся с несколькими сериализуемыми исключениями и провел поиск в Интернете и документации Flink; есть некоторые известные решения, такие как переходный процесс, расширение Serializable и т. д. Каждый раз происхождение исключения очень ясно, но в моем случае я не могу найти, где именно оно не сериализовано.

В: Как мне отладить такое исключение?

А.скала:

class executor ( val sink: SinkFunction[List[String]] {
    def exe(): Unit = {
        xxx.....addSink(sinks)
    }
}

Б.скала:

class Main extends App {
  def createSink: SinkFunction[List[String]] = new StringSink()

  object StringSink {
    // static
    val stringList: List[String] = List()
  }

  // create a testing sink
  class StringSink extends SinkFunction[List[String]] {
    override def invoke(strs: List[String]): Unit = {
        // add strs into the variable "stringList" of the compagin object StringSink
    }
  }

  new executor(createSink()).exe()

  // then do somethings with the strings
}

Исключение составляет:

Реализация SinkFunction не сериализуема. Объект, вероятно, содержит или ссылается на несериализуемые поля.

Два подозрительных момента, которые я обнаружил:

  1. Экземпляр StringSink передается в другой файл.
  2. В классе StringSink он использует статическую переменную stringList своего объекта compagin.

person Leyla Lee    schedule 22.12.2017    source источник


Ответы (2)


Я столкнулся с подобными проблемами. Раньше требовалось много времени, чтобы выяснить, какой член/объект не сериализуем. Журналы исключений не очень полезны.

Что мне помогло, так это следующая опция JVM, которая позволяет получить более подробную информацию в трассировке исключений.

Включить эту опцию...

-Dsun.io.serialization.extendedDebugInfo=true

person Bon Speedy    schedule 04.05.2018

Мое первое предположение было бы в том, что у вас нет конструктора без аргументов в StringSink

Правила для типов POJO Вырезано отсюда

Flink распознает тип данных как тип POJO (и разрешает ссылки на поля «по имени»), если выполняются следующие условия:

  1. Класс является общедоступным и автономным (без нестатического внутреннего класса).
  2. Класс имеет публичный конструктор без аргументов.
  3. Все нестатические, непереходные поля в классе (и во всех суперклассах) являются либо общедоступными (и не окончательными), либо имеют общедоступные методы получения и установки, которые следуют соглашениям об именах компонентов Java для получения и установки.

Просто добавьте конструктор без аргументов и повторите попытку.

    class StringSink extends SinkFunction[List[String]] {
        public StringSink() {
        }
        
        @override def invoke(strs: List[String]): Unit = {
            // add strs into the variable "stringList" of the compagin object StringSink
        }
}
person Mongo    schedule 24.04.2018