Искра с Кассандрой: не удалось зарегистрировать spark.kryo.registrator

В настоящее время я сталкиваюсь с некоторыми проблемами, когда пытаюсь запустить Spark с Cassandra в автономном режиме.

Сначала я успешно запускаю параметр mater="local[4]" в SparkContext.

Затем я пытаюсь перейти в автономный режим. Я использовал:

Ubuntu: 12.04 Cassandra: 1.2.11 Spark: 0.8.0 Scala: 2.9.3 JDK: Oracle 1.6.0_35 Kryo: 2.21

Сначала я получил ошибку «непрочитанный блок». Как предложение в другой теме, я перехожу на использование сериализатора Kryo и добавляю Twitter Chill. Затем я получаю «Не удалось зарегистрировать spark.kryo.registrator» в своей консоли и исключение, как показано ниже:

13/10/28 12:12:36 INFO cluster.ClusterTaskSetManager: Lost TID 0 (task 0.0:0)
13/10/28 12:12:36 INFO cluster.ClusterTaskSetManager: Loss was due to java.io.EOFException
java.io.EOFException
    at org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:109)
    at org.apache.spark.broadcast.HttpBroadcast$.read(HttpBroadcast.scala:150)
    at org.apache.spark.broadcast.HttpBroadcast.readObject(HttpBroadcast.scala:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.readObject(Unknown Source)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:435)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.lang.reflect.Method.invoke(Unknown Source)
    at java.io.ObjectStreamClass.invokeReadObject(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.defaultReadFields(Unknown Source)
    at java.io.ObjectInputStream.readSerialData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.readObject(Unknown Source)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
    at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:61)
    at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:129)
    at java.io.ObjectInputStream.readExternalData(Unknown Source)
    at java.io.ObjectInputStream.readOrdinaryObject(Unknown Source)
    at java.io.ObjectInputStream.readObject0(Unknown Source)
    at java.io.ObjectInputStream.readObject(Unknown Source)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:39)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:61)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:153)
    at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

Кто-то также сталкивался с EOFException в spark раньше, ответ - неправильно зарегистрировать регистратор. Я регистрирую Регистратор, следуя руководству Spark. Регистратор, как показано ниже:

    class MyRegistrator extends KryoRegistrator {
        override def registerClasses(kryo: Kryo) {
            kryo.register(classOf[org.apache.spark.rdd.RDD[(Map[String, ByteBuffer], Map[String, ByteBuffer])]])
            kryo.register(classOf[String], 1)
            kryo.register(classOf[Map[String, ByteBuffer]], 2)
        }
    }

И я также устанавливаю свойство так же, как это делает руководство.

    System.setProperty("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    System.setProperty("spark.kryo.registrator", "main.scala.MyRegistrator")

Может ли кто-нибудь дать мне несколько советов, где я сделал неправильно? Спасибо.


person cjcrobin    schedule 28.10.2013    source источник


Ответы (1)


Исходя из моего опыта, причины получения «EOFException» и «непрочитанного блока данных» одинаковы. Им не хватает некоторых библиотек при работе на кластерах. Самое сложное в том, что я добавил библиотеки с «сборкой sbt» в spark, и библиотеки фактически существуют в папке jars. Но искра до сих пор не может их найти и загрузить. Потом добавляю библиотеки в контекст искры, работает. Это означает, что мне нужно перенести библиотеки на каждый узел, указав в коде.

person cjcrobin    schedule 05.11.2013
comment
SparkContext.addJar() — это предпочтительный способ код доставки в кластер. Я не рекомендую модифицировать собственную сборку Spark для добавления пользовательских библиотек (если вы попробовали это, и это не сработало, вам, вероятно, нужно было скопировать обновленные сборки Spark на рабочие машины; это неудобно, поэтому я бы придерживался до addJar()). - person Josh Rosen; 06.11.2013