pyspark структурированная потоковая запись на паркет в пакетном режиме

Я делаю некоторые преобразования в структурированном потоковом фрейме данных Spark. Я сохраняю преобразованный фрейм данных в виде файлов паркета в hdfs. Теперь я хочу, чтобы запись в hdfs происходила партиями, а не сначала преобразовывала весь фрейм данных, а затем сохраняла фрейм данных.


person Y0gesh Gupta    schedule 26.04.2019    source источник
comment
можешь уточнить? ... ваши сохраненные паркетные файлы в hdfs выводятся из задания структурированной потоковой передачи или обычного искрового задания? или вы пытаетесь использовать структурированную потоковую передачу для записи мини-пакетов на паркет в hdfs?   -  person thePurplePython    schedule 26.04.2019
comment
Я пытаюсь записать паркетный файл мини-пакетами в hdfs из моей структурированной потоковой передачи. Мой источник структурированного потока - кафка.   -  person Y0gesh Gupta    schedule 26.04.2019
comment
Спасибо за разъяснение. Я предложил вам несколько решений для начала работы.   -  person thePurplePython    schedule 26.04.2019


Ответы (1)


Вот пример паркетной мойки:

# parquet sink example
targetParquetHDFS = sourceTopicKAFKA
    .writeStream
    .format("parquet") # can be "orc", "json", "csv", etc.
    .outputMode("append") # can only be "append"
    .option("path", "path/to/destination/dir")
    .partitionBy("col") # if you need to partition
    .trigger(processingTime="...") # "mini-batch" frequency when data is outputed to sink
    .option("checkpointLocation", "path/to/checkpoint/dir") # write-ahead logs for recovery purposes
    .start()
targetParquetHDFS.awaitTermination()

Для более подробной информации:

Интеграция Kafka: https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

Руководство по программированию SS: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks

добавлен

Хорошо ... Я добавил кое-что в ответ, чтобы прояснить ваш вопрос.

SS имеет несколько различных типов триггеров:

https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers

по умолчанию: следующий триггер срабатывает после завершения обработки предыдущего триггера.

фиксированные интервалы: .trigger(processingTime='10 seconds') поэтому триггер на 10 секунд срабатывает в 00:10, 00:20, 00:30

единовременно: обрабатывает все доступные данные одновременно .trigger(once=True)

непрерывный / фиксированный интервал между контрольными точками => лучше всего см. документ руководства по программированию

Поэтому в вашем примере Kafka SS может обрабатывать данные о временной метке события в микропакетах с помощью триггеров «по умолчанию», «фиксированный интервал» или « разовая "обработка всех данных, имеющихся в исходной теме Kafka.

person thePurplePython    schedule 26.04.2019
comment
Спасибо за Ваш ответ. Я пытаюсь понять, как триггер создает микропакет. Предположим, у меня есть 20 000 сообщений, отправленных из Kafka. Произойдет ли преобразование всех сообщений одновременно в виде единого микропакета или эти сообщения будут преобразованы в небольшие микропакеты в течение определенного интервала времени, а затем будут обрабатываться по одному. - person Y0gesh Gupta; 26.04.2019
comment
@ Y0geshGupta, пожалуйста ... Я добавил дополнительную информацию в ответ на ваш вопрос - person thePurplePython; 26.04.2019
comment
Спасибо за подробности, чтобы сделать его более понятным. Я все еще не уверен, что если я использую processingTime для запуска, будет ли он хранить файлы паркета из набора данных небольшими партиями (преобразовывать небольшими партиями, а затем сохранять), или он будет выполнять преобразование полных 20k записей в фреймворке данных и сохранять их как паркет, рассматривая его как единую микропакет. Я беру 20 тыс. Записей, потому что это начальное количество сообщений, приходящих из моей темы kafka. - person Y0gesh Gupta; 26.04.2019
comment
Если вы выполняете processingTime, он добавляет новые данные (в виде паркетных файлов) каждую частоту интервала запуска на основе времени события в метке времени исходных данных ... Я не очень разбираюсь в архитектуре Kafka, но я предполагаю, что ваши данные является потоковым и отслеживает время события наблюдения в виде отметки времени. - person thePurplePython; 26.04.2019