Устранение SparkException: задача не сериализуется при импорте модели PMML

Я хочу импортировать модель PMML, чтобы вычислить оценку с помощью Spark. Все работает нормально, когда я не использую искру, но я не могу использовать свой метод в маппере.

Проблема в том, что мне нужен объект Evaluation из org.jpmml.evaluator.Evaluator, который, похоже, не сериализуем. Итак, я попытался сделать это Serialiazable со следующим классом:

package util;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

import org.jpmml.evaluator.Evaluator;

public class SerializableEvaluator implements Serializable {

    private static final long serialVersionUID = 6631604036553063657L;
    private Evaluator evaluator;

    public SerializableEvaluator(Evaluator evaluator) {
        this.evaluator = evaluator;
    }

    public Evaluator getEvaluator() {
        return evaluator;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.writeObject(evaluator);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        Evaluator eval = (Evaluator) in.readObject();
    }
}

Я также сделал все свои классы сериализуемыми.

Вот пример моего кода:

        logger.info("Print 5 first rows----------------------------");
        strTitanicRDD
                .take(5)
                .forEach(row -> logger.info(row));
        logger.info("Print 5 first Titatnic Obs---------------------");
        strTitanicRDD
                .map(row -> new TitanicObservation(row))
                .take(5)
                .forEach(titanic -> logger.info(titanic.toString()));
        logger.info("Print 5 first Scored Titatnic Obs---------------");

        try{strTitanicRDD
            .map(row -> new TitanicObservation(row))
            .map(
                new Function<TitanicObservation,String>(){

                    private static final long serialVersionUID = -2968122030659306400L;

                    @Override
                    public String call(TitanicObservation titanic) throws Exception {
                        String res = PmmlUtil.computeScoreTitanic(evaluator, titanic);
                        return res;
                    }

                })
        .take(5)
        .forEach(row -> logger.info(row));

Но я не думаю, что мой код поможет вам решить мою проблему, которая очень ясна (см. логи :)

org.apache.spark.SparkException: задача не сериализуема в org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) в org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) ) в org.apache.spark.SparkContext.clean(SparkContext.scala:1623) в org.apache.spark.rdd.RDD.map(RDD.scala:286) в org.apache.spark.api.java.JavaRDDLike$ class.map(JavaRDDLike.scala:89) в org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46) в score.acv.AppWithSpark.main(AppWithSpark.java:117) в sun.reflect .NativeMethodAccessorImpl.invoke0(собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) в sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke(метод .java:497) по адресу org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$Spar kSubmit$$runMain(SparkSubmit.scala:577) по адресу org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174) по адресу org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala: 197) по адресу org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) по адресу org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Вызвано: java.io.NotSerializableException: org.xml.sax.helpers.LocatorImpl Стек сериализации:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 15 more

person AntonyBrd    schedule 17.02.2016    source источник


Ответы (1)


За интерфейсом org.jpmml.evaluator.Evaluator находится экземпляр некоторого подкласса org.jpmml.evaluator.ModelEvaluator. Класс ModelEvaluator и все его подклассы сериализуемы по своей конструкции. Проблема связана с экземпляром объекта org.dmg.pmml.PMML, который вы предоставили методу ModelEvaluatorFactory#newModelManager(PMML) в начале.

Короче говоря, каждый объект модели класса PMML может иметь присоединенную к нему информацию SAX Locator. Это полезно на этапах разработки и тестирования для обнаружения оскорбительного содержимого XML. Однако на этапе производства эта информация больше не должна храниться. Вы можете отключить информацию SAX Locator, либо правильно настроив среду выполнения JAXB, либо просто очистив существующие экземпляры SAX Locator, вызвав PMMLObject#setLocator(Locatable) с аргументом null. Последняя функциональность формализована классом org.jpmml.model.visitors.LocatorNullifier Visitor.

Полный пример см. в служебном классе org.jpmml.spark.EvaluatorUtil (особенно в строках с 73 по 75) официального сайта JPMML-Spark. проект. Почему бы вам вообще не использовать JPMML-Spark?

person user1808924    schedule 17.02.2016
comment
Большое вам спасибо за вашу помощь. Я не использовал Jpmml-spark, потому что для этого требуется Spark 1.5, а моему приложению может потребоваться работа со Spark 1.3.1. Поскольку мне удалось вычислить счет с помощью JPMML, я подумал, что будет просто поместить мою функцию в картограф. Мне очень интересен ваш опыт, применяли ли вы PMML в производственном контексте? Является ли spark-JPMML жизнеспособной альтернативой, по вашему мнению? - person AntonyBrd; 18.02.2016