крио-сериализация класса (объекта задачи) в apache spark возвращает значение null при десериализации

Я использую java spark API для написания тестового приложения. Я использую класс, который не расширяет сериализуемый интерфейс. Поэтому, чтобы заставить приложение работать, я использую сериализатор kryo для сериализации класса. Но проблема, которую я заметил во время отладки, заключалась в том, что во время десериализации возвращаемый объект класса становится нулевым и, в свою очередь, генерирует исключение нулевого указателя. Кажется, это проблема с закрытием, когда что-то идет не так, но я не уверен. Поскольку я новичок в такого рода сериализации, я не знаю, с чего начать копать.

Вот код, который я тестирую:

package org.apache.spark.examples;


import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.net.InetAddress;
import java.net.UnknownHostException;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;




/**
 * Spark application to test the Serialization issue in spark
 */
public class Test {

    static PrintWriter outputFileWriter;
    static FileWriter file;
    static JavaSparkContext ssc;

    public static void main(String[] args) {


        String inputFile = "/home/incubator-spark/examples/src/main/scala/org/apache/spark/examples/InputFile.txt";

        String master = "local";
        String jobName = "TestSerialization";
        String sparkHome = "/home/test/Spark_Installation/spark-0.7.0";
        String sparkJar = "/home/test/TestSerializationIssesInSpark/TestSparkSerIssueApp/target/TestSparkSerIssueApp-0.0.1-SNAPSHOT.jar";


        SparkConf conf = new SparkConf();
        conf.set("spark.closure.serializer","org.apache.spark.serializer.KryoSerializer");
        conf.set("spark.kryo.registrator", "org.apache.spark.examples.MyRegistrator");
        // create the Spark context
        if(master.equals("local")){
            ssc = new JavaSparkContext("local", jobName,conf);
            //ssc = new JavaSparkContext("local", jobName);
        } else {
            ssc = new JavaSparkContext(master, jobName, sparkHome, sparkJar);
        }
        JavaRDD<String> testData = ssc.textFile(inputFile).cache();
        final NotSerializableJavaClass notSerializableTestObject= new NotSerializableJavaClass("Hi ");
        @SuppressWarnings({ "serial", "unchecked"})
        JavaRDD<String> classificationResults = testData.map(
                new Function<String, String>() {
                    @Override
                    public String call(String inputRecord) throws Exception {                   
                        if(!inputRecord.isEmpty()) {
                            //String[] pointDimensions = inputRecord.split(",");
                            String result = "";

                            try {
                                FileWriter file = new FileWriter("/home/test/TestSerializationIssesInSpark/results/test_result_" + (int) (Math.random() * 100));
                                PrintWriter outputFile = new PrintWriter(file); 
                                InetAddress ip;
                                ip = InetAddress.getLocalHost();
                                outputFile.println("IP of the server: " + ip);

                                result = notSerializableTestObject.testMethod(inputRecord);
                                outputFile.println("Result: " + result);

                                outputFile.flush();
                                outputFile.close();
                                file.close();

                            } catch (UnknownHostException e) {
                                e.printStackTrace();
                            }
                            catch (IOException e1) {
                                e1.printStackTrace();
                            } 

                            return result;
                        } else {
                            System.out.println("End of elements in the stream.");
                            String result = "End of elements in the input data";
                            return result;
                        }
                    }

                }).cache(); 

        long processedRecords = classificationResults.count();

        ssc.stop();
        System.out.println("sssssssssss"+processedRecords);
    }
}

Вот класс KryoRegistrator

package org.apache.spark.examples;

import org.apache.spark.serializer.KryoRegistrator;

import com.esotericsoftware.kryo.Kryo;

public class MyRegistrator implements KryoRegistrator {
    public void registerClasses(Kryo kryo) {
        kryo.register(NotSerializableJavaClass.class);
    }
}

Вот класс, который я сериализую:

package org.apache.spark.examples;

public class NotSerializableJavaClass {
    public String testVariable;

    public NotSerializableJavaClass(String testVariable) {
        super();
        this.testVariable = testVariable;
    }

    public String testMethod(String vartoAppend){
        return this.testVariable + vartoAppend;
    }
}

person Harsh Gupta    schedule 21.04.2014    source источник
comment
Я заметил пару вещей; ваш NotSerializableJavaClass не имеет конструктора без аргументов. Крио это не нравится. Кроме того, я далеко не эксперт, но сериализация выглядит необычно. Я ожидал увидеть там Kryo kryo = new Kryo(); kryo.writeClassAndObject(output, objectToSerialise); или что-то подобное.   -  person Richard Tingle    schedule 21.04.2014
comment
Привет, Ричард! В API-интерфейсе spark реализован встроенный API-интерфейс kryo, который делает то же самое, что вы указали, выполнив Kryo kryo = new Kryo(); kryo.writeClassAndObject(output, objectToSerialise); . Но что вы предлагаете, как мне двигаться дальше? Поможет ли конструктор без аргументов решить эту проблему?   -  person Harsh Gupta    schedule 21.04.2014
comment
если бы вы могли легко добавить конструктор без аргументов, я бы это сделал. Я не могу обещать, что это решит вашу проблему, так как я не использовал крио в искре, но я знаю, что в 90% случаев, когда у меня возникает проблема, это либо конструктор без аргументов, либо вложенный класс.   -  person Richard Tingle    schedule 21.04.2014
comment
Хотя весь смысл того, что я пытался это сделать, заключался в том, что у нас есть ситуация, когда я могу получить сторонние банки, которые могут иметь любую структуру, и они могут быть не сериализуемы, и изменение структуры класса может победить весь смысл, но, тем не менее, это голова Начало . Спасибо   -  person Harsh Gupta    schedule 21.04.2014
comment
Харш, вы смогли решить свою ошибку с предложением?   -  person urug    schedule 26.06.2015


Ответы (1)


Это связано с тем, что spark.closure.serializer поддерживает только сериализатор Java. См. http://spark.apache.org/docs/latest/configuration.html о spark.closure.serializer

person zsxwing    schedule 20.10.2014
comment
Верно, но регистрация вашего класса Java в Kryo приводит к использованию сериализатора Java. Я сделал это с помощью BiMap от Guava. Без регистрации BiMap задачу сериализовать не удалось. - person pferrel; 29.12.2014
comment
Итак, еще одна проблема, с которой я сталкиваюсь, приводит к невозможности сериализации задачи из закрытия. Я использую внешнюю библиотеку scala в закрытии. Как определить, какой класс вызывает проблему. Насколько я знаю, это может быть транзитивная зависимость. - person pferrel; 29.12.2014
comment
Использование внешней библиотеки scala в замыкании допустимо, если вы убедитесь, что не используете их вне замыкания. Для отладки вы можете использовать параметр Java -Dsun.io.serialization.extendedDebugInfo=true. - person zsxwing; 30.12.2014
comment
Я читаю текстовый файл, используя rdd.textFile(source).map{closure} внутри закрытия. Я использую json4s (который использует java jackson) и получаю задачу, не сериализованную с ошибкой. Закрытие возвращает rdd классов case, поэтому мне не нужно использовать его снаружи. Почему задача не сериализована и почему я не могу использовать json4 вне замыкания? Нужно ли мне что-то регистрировать в Kryo? - person pferrel; 31.12.2014
comment
Например, вы создаете какой-то объект в json4s вне замыкания и используете его в замыкании. Затем при сериализации замыкания также необходимо сериализовать объект из json4s. Если объект не является сериализуемым, вы получите сериализуемый сбой. - person zsxwing; 31.12.2014
comment
Но вы говорите, что использование json4s вне замыкания — это нормально, и я знал это — не задействован ни искровой код, ни сериализация замыкания. Как я могу использовать его внутри закрытия? Я думал, вы сказали, что использовать внешнюю библиотеку scala в закрытии нормально? Использование его внутри вызывает ошибки, не сериализуемые задачей, поскольку замыкания распространяются через Spark. - person pferrel; 31.12.2014