Я хочу импортировать модель PMML, чтобы вычислить оценку с помощью Spark. Все работает нормально, когда я не использую искру, но я не могу использовать свой метод в маппере.
Проблема в том, что мне нужен объект Evaluation из org.jpmml.evaluator.Evaluator, который, похоже, не сериализуем. Итак, я попытался сделать это Serialiazable со следующим классом:
package util;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import org.jpmml.evaluator.Evaluator;
public class SerializableEvaluator implements Serializable {
private static final long serialVersionUID = 6631604036553063657L;
private Evaluator evaluator;
public SerializableEvaluator(Evaluator evaluator) {
this.evaluator = evaluator;
}
public Evaluator getEvaluator() {
return evaluator;
}
private void writeObject(ObjectOutputStream out) throws IOException {
out.writeObject(evaluator);
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
Evaluator eval = (Evaluator) in.readObject();
}
}
Я также сделал все свои классы сериализуемыми.
Вот пример моего кода:
logger.info("Print 5 first rows----------------------------");
strTitanicRDD
.take(5)
.forEach(row -> logger.info(row));
logger.info("Print 5 first Titatnic Obs---------------------");
strTitanicRDD
.map(row -> new TitanicObservation(row))
.take(5)
.forEach(titanic -> logger.info(titanic.toString()));
logger.info("Print 5 first Scored Titatnic Obs---------------");
try{strTitanicRDD
.map(row -> new TitanicObservation(row))
.map(
new Function<TitanicObservation,String>(){
private static final long serialVersionUID = -2968122030659306400L;
@Override
public String call(TitanicObservation titanic) throws Exception {
String res = PmmlUtil.computeScoreTitanic(evaluator, titanic);
return res;
}
})
.take(5)
.forEach(row -> logger.info(row));
Но я не думаю, что мой код поможет вам решить мою проблему, которая очень ясна (см. логи :)
org.apache.spark.SparkException: задача не сериализуема в org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) в org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) ) в org.apache.spark.SparkContext.clean(SparkContext.scala:1623) в org.apache.spark.rdd.RDD.map(RDD.scala:286) в org.apache.spark.api.java.JavaRDDLike$ class.map(JavaRDDLike.scala:89) в org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46) в score.acv.AppWithSpark.main(AppWithSpark.java:117) в sun.reflect .NativeMethodAccessorImpl.invoke0(собственный метод) в sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) в sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) в java.lang.reflect.Method.invoke(метод .java:497) по адресу org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$Spar kSubmit$$runMain(SparkSubmit.scala:577) по адресу org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174) по адресу org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala: 197) по адресу org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) по адресу org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Вызвано: java.io.NotSerializableException: org.xml.sax.helpers.LocatorImpl Стек сериализации:
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) ... 15 more