Как сохранить местоположение контрольной точки Spark Streaming в S3?

Меня интересует приложение Spark Streaming (Spark v2.3.2), которое получает данные паркета S3 и записывает данные паркета в S3. Поток фреймов данных приложения использует groupByKey() и flatMapGroupsWithState() для использования GroupState.

Можно ли настроить это для использования местоположения контрольной точки s3? Например:

val stream = myDataset.writeStream
    .format("parquet")
    .option("path", s3DataDestination)
    .option("checkpointLocation", s3CheckpointPath)
    .option("truncate", false)
    .option(Trigger.Once)
    .outputMode(OutputMode.Append)
stream.start().awaitTermination()

Я подтвердил, что вышеуказанное может успешно записывать данные в файл s3DataDestination.

Однако возникает исключение при записи в расположение контрольной точки s3:

java.lang.IllegalStateException: Error committing version 1 into HDFSStateStore[id=(op=0, part=9), dir=s3://<my_s3_location>
    at org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider$HDFSBackedStateStore.commit(...)
...
Caused by: java.io.IOException: Failed to rename s3://.../checkpoint/state/0/9/temp... to s3://.../checkpoint/state/0/9/1.delta

Потребуется ли для этого специальная реализация S3 StateStoreProvider? Или местоположение контрольной точки нужно прописывать в HDFS?




Ответы (1)


Проблема в том, что частота параллелизма операций записи и чтения слишком высока. AWS S3 не предоставляет такой функции.

Решение :

  • Нам пришлось переключиться на локальный постоянный диск для контрольной точки Spark.
  • S3Guard: Это сделает чтение и запись S3 более последовательными (Примечание: это экспериментально, и я лично никогда не видел его в действии).
  • Использовать HDFS
person QuickSilver    schedule 18.06.2020