нужна помощь в присоединении к Spark RDD в Java

Необходимо выполнить следующую операцию соединения в искре

JavaPairRDD<String, Tuple2<Optional<MarkToMarketPNL>, Optional<MarkToMarketPNL>>> finalMTMPNLRDD = openMTMPNL.fullOuterJoin(closedMTMPNL);

Для выполнения этой операции мне нужны два JavaPairRDD, закрытыеMTMPNL и openMTMPNL. OpenMTM и closeMTM работают нормально, но keyBy на обоих RDD выдает ошибку во время выполнения.

JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){
                public String call(MarkToMarketPNL mtm) throws Exception
                {
                        return mtm.getTaxlot();
                }
            }); 

JavaPairRDD<String,MarkToMarketPNL> closedMTMPNL = closedMTM.keyBy(new Function<MarkToMarketPNL,String>(){
                    public String call(MarkToMarketPNL mtm) throws Exception
                    {
                        return mtm.getTaxlot();
                    }
                }); 

Есть ли другой способ присоединиться к RDD openMTM и closeMTM? На данный момент мы пытаемся получить два RDD, на которых можно выполнить соединение со строкой. Что вызывает исключение??

Прикрепление трассировки стека

java.lang.NullPointerException
15/06/28 01:19:30 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.lang.NullPointerException
    at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
    at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
    at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)
15/06/28 01:19:30 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): java.lang.NullPointerException
    at scala.collection.convert.Wrappers$JIterableWrapper.iterator(Wrappers.scala:53)
    at scala.collection.IterableLike$class.toIterator(IterableLike.scala:89)
    at scala.collection.AbstractIterable.toIterator(Iterable.scala:54)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1626)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
    at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1095)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1765)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:63)
    at org.apache.spark.scheduler.Task.run(Task.scala:70)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.lang.Thread.run(Unknown Source)

person 107    schedule 27.06.2015    source источник
comment
Мое первое предположение было бы в том, что некоторые из mtms являются нулевыми.   -  person abalcerek    schedule 27.06.2015


Ответы (3)


Это исключение связано с тем, что одна из ваших функций возвращает нулевое значение. Вы можете вернуть null и после этого отфильтровать нулевые кортежи, такие как:

JavaPairRDD<String,MarkToMarketPNL> openMTMPNL = openMTM.keyBy(new Function<MarkToMarketPNL,String>(){
            public String call(MarkToMarketPNL mtm) throws Exception
            {
                    return mtm.getTaxlot();
            }
        }).filter(new Function<Tuple2<String, MarkToMarketPNL>, Boolean>() {

        @Override
        public Boolean call(Tuple2<String, MarkToMarketPNL> arg) throws Exception {
            return arg == null ? false : true;
       }
    }); 
person Malemi    schedule 19.08.2015

Я думаю, что ошибка не в коде, который вы включили в вопрос. Spark пытается запустить count на RDD. Код, который вы включили, не вызывает count, так что это один признак. Но исключение предполагает, что подсчитываемый RDD имеет итератор, который был создан на Java и теперь преобразуется в итератор Scala. В этот момент оказывается, что этот итератор на самом деле null.

Ваш код где-то создает итератор? Возможно, в вызове mapPartitions или что-то в этом роде?

person Daniel Darabos    schedule 27.06.2015

Я столкнулся с той же проблемой. Когда операция соединения выполняется внутри, создается ‹key,Iterable‹values>>. Если один из объектов Iterable‹values> имеет значение null, мы видим исключение нулевого указателя, как указано выше.

Перед выполнением соединения убедитесь, что ни одно из значений не равно NULL.

person Abhiram    schedule 27.01.2017