Я делаю некоторые преобразования в структурированном потоковом фрейме данных Spark. Я сохраняю преобразованный фрейм данных в виде файлов паркета в hdfs. Теперь я хочу, чтобы запись в hdfs происходила партиями, а не сначала преобразовывала весь фрейм данных, а затем сохраняла фрейм данных.
pyspark структурированная потоковая запись на паркет в пакетном режиме
Ответы (1)
Вот пример паркетной мойки:
# parquet sink example
targetParquetHDFS = sourceTopicKAFKA
.writeStream
.format("parquet") # can be "orc", "json", "csv", etc.
.outputMode("append") # can only be "append"
.option("path", "path/to/destination/dir")
.partitionBy("col") # if you need to partition
.trigger(processingTime="...") # "mini-batch" frequency when data is outputed to sink
.option("checkpointLocation", "path/to/checkpoint/dir") # write-ahead logs for recovery purposes
.start()
targetParquetHDFS.awaitTermination()
Для более подробной информации:
Интеграция Kafka: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html
Руководство по программированию SS: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks
добавлен
Хорошо ... Я добавил кое-что в ответ, чтобы прояснить ваш вопрос.
SS имеет несколько различных типов триггеров:
https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers
по умолчанию: следующий триггер срабатывает после завершения обработки предыдущего триггера.
фиксированные интервалы: .trigger(processingTime='10 seconds')
поэтому триггер на 10 секунд срабатывает в 00:10, 00:20, 00:30
единовременно: обрабатывает все доступные данные одновременно .trigger(once=True)
непрерывный / фиксированный интервал между контрольными точками => лучше всего см. документ руководства по программированию
Поэтому в вашем примере Kafka SS может обрабатывать данные о временной метке события в микропакетах с помощью триггеров «по умолчанию», «фиксированный интервал» или « разовая "обработка всех данных, имеющихся в исходной теме Kafka.