RDD не сериализуемый API-интерфейс Cassandra/Spark для соединителя Java

поэтому у меня ранее были вопросы о том, как запросить cassandra с помощью spark в проекте java maven здесь: Запрос данных в Cassandra через Spark в проекте Java Maven

Ну, на мой вопрос ответили, и это сработало, однако я столкнулся с проблемой (возможно, проблемой). Теперь я пытаюсь использовать Java API datastax. Вот мой код:

package com.angel.testspark.test2;

import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;

import java.io.Serializable;

import static com.datastax.spark.connector.CassandraJavaUtil.*;


public class App 
{

    // firstly, we define a bean class
    public static class Person implements Serializable {
        private Integer id;
        private String fname;
        private String lname;
        private String role;

        // Remember to declare no-args constructor
        public Person() { }

        public Integer getId() { return id; }
        public void setId(Integer id) { this.id = id; }

        public String getfname() { return fname; }
        public void setfname(String fname) { this.fname = fname; }

        public String getlname() { return lname; }
        public void setlname(String lname) { this.lname = lname; }

        public String getrole() { return role; }
        public void setrole(String role) { this.role = role; }

        // other methods, constructors, etc.
    }

    private transient SparkConf conf;
    private App(SparkConf conf) {
        this.conf = conf;
    }


    private void run() {
        JavaSparkContext sc = new JavaSparkContext(conf);
        createSchema(sc);


        sc.stop();
    }

    private void createSchema(JavaSparkContext sc) {

        JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
                .where("role=?", "IT Engineer").map(new Function<Person, String>() {
                    @Override
                    public String call(Person person) throws Exception {
                        return person.toString();
                    }
                });
        System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
               }



    public static void main( String[] args )
    {
        if (args.length != 2) {
            System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
            System.exit(1);
        }

        SparkConf conf = new SparkConf();
        conf.setAppName("Java API demo");
        conf.setMaster(args[0]);
        conf.set("spark.cassandra.connection.host", args[1]);

        App app = new App(conf);
        app.run();
    }
}

вот моя ошибка:

Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Теперь я точно ЗНАЮ, где моя ошибка. Это System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));, потому что мне нужно преобразовать rdd в массив. Тем не менее, документация API говорит, что я должен это сделать... это код, скопированный и вставленный из документации. Почему я не могу сериализовать RDD в массив?

Я уже вставил фиктивные данные в свою кассандру, используя вставки в своем посте, которые я включил в ссылку выше.

Кроме того, предыдущая ошибка, которую я решил, заключалась в том, что я изменил все свои геттеры и сеттеры на нижний регистр. Когда я использовал в них заглавные буквы, это выдавало ошибку. Почему я не могу использовать здесь заглавные буквы в моих геттерах и сеттерах?

Спасибо, Ангел


person angyxpoo    schedule 22.09.2014    source источник


Ответы (1)


Изменение public class App на public class App implements Serializable должно исправить ошибку. Поскольку внутренний класс Java будет содержать ссылку на внешний класс, ваш объект Function будет иметь ссылку на App. Поскольку Spark необходимо сериализовать ваш объект Function, он требует, чтобы App также был сериализуемым.

person zsxwing    schedule 23.09.2014
comment
Благодарю вас! Это сработало. Есть ли у вас понимание, почему это утверждение не работает? JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class).where("role=?", "IT Engineer").map(new Function<Person, String>() выдает ошибку, если я оставляю .where(), но если я удаляю его и оставляю .map, весь код работает. документально подтверждено, что .where должен работать - person angyxpoo; 23.09.2014
comment
Я разместил на нем еще один вопрос, но на него был дан ответ stackoverflow.com/questions/26001566/. Благодарю вас! - person angyxpoo; 24.09.2014