Я включил контрольную точку, которая сохраняет журналы на S3. Если в каталоге контрольной точки НЕТ файлов, потоковая передача искры работает нормально, и я вижу файлы журналов, появляющиеся в каталоге контрольной точки. Затем я убиваю поток искры и перезапускаю его. На этот раз я начинаю получать исключение NullPointerException для искрового сеанса. Короче говоря, если в каталоге контрольной точки НЕТ файлов журнала, потоковая передача искры работает нормально. Однако, как только я перезапускаю потоковую передачу искры с помощью файлов журнала в каталоге контрольной точки, я начинаю получать исключение нулевого указателя в сеансе искры. Ниже приведен код:
object asf {
val microBatchInterval = 5
val sparkSession = SparkSession
.builder()
.appName("Streaming")
.getOrCreate()
val conf = new SparkConf(true)
//conf.set("spark.streaming.receiver.writeAheadLog.enable", "true")
val sparkContext = SparkContext.getOrCreate(conf)
val checkpointDirectory = "s3a://bucketname/streaming-checkpoint"
println("Spark session: " + sparkSession)
val ssc = StreamingContext.getOrCreate(checkpointDirectory,
() => {
createStreamingContext(sparkContext, microBatchInterval, checkpointDirectory, sparkSession)
}, s3Config.getConfig())
ssc.start()
ssc.awaitTermination()
}
def createStreamingContext(sparkContext: SparkContext, microBatchInterval: Int, checkpointDirectory: String,spark:SparkSession): StreamingContext = {
println("Spark session inside: " + spark)
val ssc: org.apache.spark.streaming.StreamingContext = new StreamingContext(sparkContext, Seconds(microBatchInterval))
//TODO: StorageLevel.MEMORY_AND_DISK_SER
val lines = ssc.receiverStream(new EventHubClient(StorageLevel.MEMORY_AND_DISK_SER);
lines.foreachRDD {
rdd => {
val df = spark.read.json(rdd)
df.show()
}
}
ssc.checkpoint(checkpointDirectory)
ssc
}
}
И снова, в самый первый раз, когда я запускаю этот код (без файлов журнала в каталоге контрольной точки), я вижу распечатываемый фрейм данных. И если я запускаю файлы журнала в каталоге контрольной точки, я даже не вижу
println("Spark session inside: " + spark)
печатается и печатается в ПЕРВЫЙ раз. Ошибка:
Exception in thread "main" java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:111)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:109)
at org.apache.spark.sql.DataFrameReader.<init>(DataFrameReader.scala:549)
at org.apache.spark.sql.SparkSession.read(SparkSession.scala:605)
И ошибка происходит по адресу:
val df = spark.read.json(rdd)
Изменить: я добавил эту строку:
conf.set("spark.streaming.stopGracefullyOnShutdown","true")
и это все еще не имело значения, все еще получая NullPointerException.