поэтому у меня ранее были вопросы о том, как запросить cassandra с помощью spark в проекте java maven здесь: Запрос данных в Cassandra через Spark в проекте Java Maven
Ну, на мой вопрос ответили, и это сработало, однако я столкнулся с проблемой (возможно, проблемой). Теперь я пытаюсь использовать Java API datastax. Вот мой код:
package com.angel.testspark.test2;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.io.Serializable;
import static com.datastax.spark.connector.CassandraJavaUtil.*;
public class App
{
// firstly, we define a bean class
public static class Person implements Serializable {
private Integer id;
private String fname;
private String lname;
private String role;
// Remember to declare no-args constructor
public Person() { }
public Integer getId() { return id; }
public void setId(Integer id) { this.id = id; }
public String getfname() { return fname; }
public void setfname(String fname) { this.fname = fname; }
public String getlname() { return lname; }
public void setlname(String lname) { this.lname = lname; }
public String getrole() { return role; }
public void setrole(String role) { this.role = role; }
// other methods, constructors, etc.
}
private transient SparkConf conf;
private App(SparkConf conf) {
this.conf = conf;
}
private void run() {
JavaSparkContext sc = new JavaSparkContext(conf);
createSchema(sc);
sc.stop();
}
private void createSchema(JavaSparkContext sc) {
JavaRDD<String> rdd = javaFunctions(sc).cassandraTable("tester", "empbyrole", Person.class)
.where("role=?", "IT Engineer").map(new Function<Person, String>() {
@Override
public String call(Person person) throws Exception {
return person.toString();
}
});
System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
}
public static void main( String[] args )
{
if (args.length != 2) {
System.err.println("Syntax: com.datastax.spark.demo.JavaDemo <Spark Master URL> <Cassandra contact point>");
System.exit(1);
}
SparkConf conf = new SparkConf();
conf.setAppName("Java API demo");
conf.setMaster(args[0]);
conf.set("spark.cassandra.connection.host", args[1]);
App app = new App(conf);
app.run();
}
}
вот моя ошибка:
Exception in thread "main" org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.angel.testspark.test2.App
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1020)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1018)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1018)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:781)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:724)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:554)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:190)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Теперь я точно ЗНАЮ, где моя ошибка. Это System.out.println("Data as Person beans: \n" + StringUtils.join("\n", rdd.toArray()));
, потому что мне нужно преобразовать rdd в массив. Тем не менее, документация API говорит, что я должен это сделать... это код, скопированный и вставленный из документации. Почему я не могу сериализовать RDD в массив?
Я уже вставил фиктивные данные в свою кассандру, используя вставки в своем посте, которые я включил в ссылку выше.
Кроме того, предыдущая ошибка, которую я решил, заключалась в том, что я изменил все свои геттеры и сеттеры на нижний регистр. Когда я использовал в них заглавные буквы, это выдавало ошибку. Почему я не могу использовать здесь заглавные буквы в моих геттерах и сеттерах?
Спасибо, Ангел