Как исправить Dataflow, который не может сериализовать мой DoFn?

Когда я запускаю свой конвейер потока данных, я получаю приведенное ниже исключение, жалующееся на то, что мой DoFn не может быть сериализован. Как я могу это исправить?

Вот трассировка стека:

Caused by: java.lang.IllegalArgumentException: unable to serialize contrail.dataflow.AvroMRTransforms$AvroReducerDoFn@bba0fc2
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:51)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.ensureSerializable(SerializableUtils.java:81)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.ensureSerializable(DirectPipelineRunner.java:784)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1025)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:963)
    at com.google.cloud.dataflow.sdk.transforms.ParDo.access$000(ParDo.java:441)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:951)
    at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:946)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:611)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:200)
    at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:196)
    at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:109)
    at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:204)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:584)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:328)
    at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:70)
    at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:145)
    at contrail.stages.DataflowStage.stageMain(DataflowStage.java:51)
    at contrail.stages.NonMRStage.execute(NonMRStage.java:130)
    at contrail.stages.NonMRStage.run(NonMRStage.java:157)
    at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
    at contrail.stages.ValidateGraphDataflow.main(ValidateGraphDataflow.java:139)
    ... 6 more
Caused by: java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at com.google.cloud.dataflow.sdk.util.SerializableUtils.serializeToByteArray(SerializableUtils.java:47)
    ... 27 more

person Jeremy    schedule 19.01.2015    source источник


Ответы (2)


Чтобы добавить к тому, что говорит Джереми...

Другая распространенная причина проблем с сериализуемостью — использование анонимного DoFn в нестатическом контексте. Анонимные внутренние классы имеют неявный указатель на окружающий класс, что также приводит к его сериализации.

person Frances    schedule 19.01.2015
comment
Хорошо... Это не имеет никакого смысла, но действительно, когда я меняю DoFn с анонимного на реальный класс, проблема исчезает. В моем случае я использую Kotlin, а не java. - person marcoseu; 16.05.2019

Если вы прокрутите трассировку стека, одна из причин четко идентифицирует данные, которые не сериализуются.

Caused by: java.io.NotSerializableException: org.apache.hadoop.mapred.JobConf

Проблема заключалась в том, что мой DoFn брал экземпляр JobConf в конструкторе и сохранял его в переменной экземпляра. Я предполагал, что JobConf можно сериализовать, но оказалось, что это не так.

Чтобы решить эту проблему, я сделал следующее

  • Я пометил переменную-член JobConf как временную, чтобы она не сериализовалась.
  • Я создал отдельную переменную типа byte[] для хранения сериализованной версии JobConf.
  • В моем конструкторе я сериализовал JobConf в byte[] и сохранил его в переменной экземпляра.
  • Я переопределил startBundle и десериализовал JobConf из byte[]

Вот суть с моим DoFn.

person Jeremy    schedule 19.01.2015
comment
Это решение очень близко к тому, что я нашел. Моя проблема заключается в сериализации схемы (avro) через функцию DoFn. Подход, который я нашел, заключается в том, чтобы передать строку схемы в конструктор класса Function, а затем выполнить синтаксический анализ в методе processElement(). Этот подход выполняет десериализацию схемы для каждой записи PCollection, которая должна быть преобразована, что замедляет производительность. Мне было интересно, ведет ли ваше решение то же самое или оно выполняет десериализацию/анализ только один раз, когда вы видите, что вы делаете это в методе startBound(). From java doc не указывается при выполнении. Спасибо - person Giuseppe Adaldo; 08.11.2016
comment
Как вы реализовали метод serializeJobConf()? - person Kakaji; 16.06.2017