Я использую java spark API для написания тестового приложения. Я использую класс, который не расширяет сериализуемый интерфейс. Поэтому, чтобы заставить приложение работать, я использую сериализатор kryo для сериализации класса. Но проблема, которую я заметил во время отладки, заключалась в том, что во время десериализации возвращаемый объект класса становится нулевым и, в свою очередь, генерирует исключение нулевого указателя. Кажется, это проблема с закрытием, когда что-то идет не так, но я не уверен. Поскольку я новичок в такого рода сериализации, я не знаю, с чего начать копать.
Вот код, который я тестирую:
package org.apache.spark.examples;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
/**
* Spark application to test the Serialization issue in spark
*/
public class Test {
static PrintWriter outputFileWriter;
static FileWriter file;
static JavaSparkContext ssc;
public static void main(String[] args) {
String inputFile = "/home/incubator-spark/examples/src/main/scala/org/apache/spark/examples/InputFile.txt";
String master = "local";
String jobName = "TestSerialization";
String sparkHome = "/home/test/Spark_Installation/spark-0.7.0";
String sparkJar = "/home/test/TestSerializationIssesInSpark/TestSparkSerIssueApp/target/TestSparkSerIssueApp-0.0.1-SNAPSHOT.jar";
SparkConf conf = new SparkConf();
conf.set("spark.closure.serializer","org.apache.spark.serializer.KryoSerializer");
conf.set("spark.kryo.registrator", "org.apache.spark.examples.MyRegistrator");
// create the Spark context
if(master.equals("local")){
ssc = new JavaSparkContext("local", jobName,conf);
//ssc = new JavaSparkContext("local", jobName);
} else {
ssc = new JavaSparkContext(master, jobName, sparkHome, sparkJar);
}
JavaRDD<String> testData = ssc.textFile(inputFile).cache();
final NotSerializableJavaClass notSerializableTestObject= new NotSerializableJavaClass("Hi ");
@SuppressWarnings({ "serial", "unchecked"})
JavaRDD<String> classificationResults = testData.map(
new Function<String, String>() {
@Override
public String call(String inputRecord) throws Exception {
if(!inputRecord.isEmpty()) {
//String[] pointDimensions = inputRecord.split(",");
String result = "";
try {
FileWriter file = new FileWriter("/home/test/TestSerializationIssesInSpark/results/test_result_" + (int) (Math.random() * 100));
PrintWriter outputFile = new PrintWriter(file);
InetAddress ip;
ip = InetAddress.getLocalHost();
outputFile.println("IP of the server: " + ip);
result = notSerializableTestObject.testMethod(inputRecord);
outputFile.println("Result: " + result);
outputFile.flush();
outputFile.close();
file.close();
} catch (UnknownHostException e) {
e.printStackTrace();
}
catch (IOException e1) {
e1.printStackTrace();
}
return result;
} else {
System.out.println("End of elements in the stream.");
String result = "End of elements in the input data";
return result;
}
}
}).cache();
long processedRecords = classificationResults.count();
ssc.stop();
System.out.println("sssssssssss"+processedRecords);
}
}
Вот класс KryoRegistrator
package org.apache.spark.examples;
import org.apache.spark.serializer.KryoRegistrator;
import com.esotericsoftware.kryo.Kryo;
public class MyRegistrator implements KryoRegistrator {
public void registerClasses(Kryo kryo) {
kryo.register(NotSerializableJavaClass.class);
}
}
Вот класс, который я сериализую:
package org.apache.spark.examples;
public class NotSerializableJavaClass {
public String testVariable;
public NotSerializableJavaClass(String testVariable) {
super();
this.testVariable = testVariable;
}
public String testMethod(String vartoAppend){
return this.testVariable + vartoAppend;
}
}
Kryo kryo = new Kryo(); kryo.writeClassAndObject(output, objectToSerialise);
или что-то подобное. - person Richard Tingle   schedule 21.04.2014Kryo kryo = new Kryo(); kryo.writeClassAndObject(output, objectToSerialise);
. Но что вы предлагаете, как мне двигаться дальше? Поможет ли конструктор без аргументов решить эту проблему? - person Harsh Gupta   schedule 21.04.2014