Нулевой указатель сеанса Spark с контрольной точкой

Я включил контрольную точку, которая сохраняет журналы на S3. Если в каталоге контрольной точки НЕТ файлов, потоковая передача искры работает нормально, и я вижу файлы журналов, появляющиеся в каталоге контрольной точки. Затем я убиваю поток искры и перезапускаю его. На этот раз я начинаю получать исключение NullPointerException для искрового сеанса. Короче говоря, если в каталоге контрольной точки НЕТ файлов журнала, потоковая передача искры работает нормально. Однако, как только я перезапускаю потоковую передачу искры с помощью файлов журнала в каталоге контрольной точки, я начинаю получать исключение нулевого указателя в сеансе искры. Ниже приведен код:

object asf {
  val microBatchInterval = 5
  val sparkSession = SparkSession
    .builder()
    .appName("Streaming")
    .getOrCreate()

    val conf = new SparkConf(true)
    //conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
    val sparkContext = SparkContext.getOrCreate(conf)


  val checkpointDirectory = "s3a://bucketname/streaming-checkpoint"

  println("Spark session: " + sparkSession)

  val ssc = StreamingContext.getOrCreate(checkpointDirectory,
    () => {
      createStreamingContext(sparkContext, microBatchInterval, checkpointDirectory, sparkSession)
    }, s3Config.getConfig())

  ssc.start()
  ssc.awaitTermination()
}

  def createStreamingContext(sparkContext: SparkContext, microBatchInterval: Int, checkpointDirectory: String,spark:SparkSession): StreamingContext = {
    println("Spark session inside: " + spark)
    val ssc: org.apache.spark.streaming.StreamingContext = new StreamingContext(sparkContext, Seconds(microBatchInterval))
    //TODO: StorageLevel.MEMORY_AND_DISK_SER
    val lines = ssc.receiverStream(new EventHubClient(StorageLevel.MEMORY_AND_DISK_SER);
    lines.foreachRDD {
      rdd => {
        val df = spark.read.json(rdd)
        df.show()
      }
    }
    ssc.checkpoint(checkpointDirectory)
    ssc
  }
}  

И снова, в самый первый раз, когда я запускаю этот код (без файлов журнала в каталоге контрольной точки), я вижу распечатываемый фрейм данных. И если я запускаю файлы журнала в каталоге контрольной точки, я даже не вижу

println("Spark session inside: " + spark)

печатается и печатается в ПЕРВЫЙ раз. Ошибка:

Exception in thread "main" java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:111)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
    at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
    at org.apache.spark.sql.SparkSession.read(SparkSession.scala:605)

И ошибка происходит по адресу:

val df = spark.read.json(rdd)

Изменить: я добавил эту строку:

conf.set("spark.streaming.stopGracefullyOnShutdown","true")

и это все еще не имело значения, все еще получая NullPointerException.


person Ahmed    schedule 13.09.2017    source источник
comment
Вы меняли код между прогонами? Вы не можете изменить код при чекпоинте. Если это так, см. соответствующую документацию по искрам. Вам нужно изящно завершить работу и удалить или изменить каталог контрольной точки.   -  person Michel Lemay    schedule 13.09.2017
comment
Каждый раз, когда я запускал задание в первый раз, я очищал s3a://bucketname/streaming-checkpoint. А затем я нажимал Ctrl+C, чтобы отключить искровую стриминг. А затем снова запустите, и тогда я получу исключение нулевого указателя. И я использовал тот же код между прогонами   -  person Ahmed    schedule 13.09.2017


Ответы (2)


Чтобы ответить на мой собственный вопрос, это работает:

lines.foreachRDD {
  rdd => {
    val sqlContext:SQLContext = SparkSession.builder.config(rdd.sparkContext.getConf).getOrCreate().sqlContext

    val df = sqlContext.read.json(rdd)
    df.show()
  }
}

Прохождение сеанса искры, созданного из rdd.sparkContext, работает

person Ahmed    schedule 14.09.2017
comment
Действительно, это многое объясняет... когда ваш код запускается на экзекьюторе, у него нет доступа к val spark. Я удивлен, что вы не получили исключение задачи, не сериализуемое. - person Michel Lemay; 14.09.2017
comment
Копайте дальше, sessionState — временный ленивый val. Это означает, что для инициализации переменной на каждом исполнителе зарезервирован слот. Он не использует тот же экземпляр, что и драйвер. Поскольку он ленив, он инициализируется при первом использовании, что происходит, когда вы снова загружаете свою контрольную точку. В этот момент он пытается использовать другое значение parentSessionState, которое равно null для исполнителей, поскольку оно также является временным. - person Michel Lemay; 14.09.2017
comment
На самом деле проблема возникает, когда я добавляю еще одну строку обработки вторичным классом после sqlContext.read.json. Например, я создаю экземпляр этого вторичного класса вне forEachRDD, и одним из параметров, который он принимает, является sparkSession при его создании. И в этом вторичном классе я использую sparkSession для работы с spark.sql и прочим. В основном это означает, что я должен создавать экземпляр этого вторичного класса каждый раз в forEachRDD, чтобы получить искровой сеанс от RDD... - person Ahmed; 15.09.2017

Просто, чтобы выразить это явно для новичков, это анти-шаблон. Создание набора данных внутри преобразования не допускается!

Как упомянул Мишель, исполнитель не будет иметь доступа к SparkSession.

person Jeevan    schedule 10.07.2018