Я пытался выяснить, как предотвратить сбой Spark из-за проблем с памятью, когда я перебираю файлы паркета и несколько функций постобработки. Извините за поток текста, но это не совсем одна конкретная ошибка (я использую PySpark). Приносим извинения, если это нарушает правильную форму переполнения стека!
Базовый псевдокод:
#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its "=" subdirectory
for counter in fileNums:
sparkDataFrame = sqlContext.read.parquet(counter)
summaryReportOne = sqlContext.sql.("SELECT.....")
summaryReportOne.write.partition("id").parquet("/")
summaryReportTwo = sqlContext.sql.("SELECT....")
summaryReportTwo.write.partition("id").parquet("/")
#several more queries, several involving joins, etc....
В этом коде используются искровые SQL-запросы, поэтому мне не удалось создать функцию-оболочку со всеми SQL-запросами / функциями и передать ее в foreach (который не может принимать sparkContext или sqlQuery в качестве входных данных) в отличие от стандарта для петля.
Технически это один большой паркетный файл с разделами, но он слишком велик, чтобы читать все сразу и запрашивать по нему; Мне нужно запустить функции на каждом разделе. Поэтому я просто запускаю обычный цикл Python в PySpark, где в каждом цикле я обрабатываю один паркетный раздел (подкаталог) и пишу соответствующие выходные отчеты.
Не уверены, сработает ли обертывание всего кода вокруг большого mapPartition () из-за размера всего файла паркета?
Но после нескольких циклов скрипт вылетает из-за ошибок памяти, в частности из-за ошибки кучи Java. (Я подтвердил, что нет ничего особенного в файле, для которого происходит сбой цикла; это происходит с любым случайным файлом, считываемым во втором или третьем цикле.)
Caused by: com.google.protobuf.ServiceException:
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space
Я понимаю, что Spark не предназначен для выполнения в цикле, но эти SQL-запросы слишком сложны для стандартных упакованных функций Spark SQL, и мы составляем несколько сводных отчетов для каждого файла по разной статистике агрегирования.
Есть ли способ в основном очистить память в конце каждого индекса цикла? Удаление любых зарегистрированных временных таблиц с помощью sqlContext.dropTempTable () и очистка кеша с помощью sqlContext.clearCache () не помогли. Если я попытаюсь остановить sparkContext и перезапустить его в каждом цикле, я также получу ошибки, так как некоторые процессы еще не завершены (похоже, что раньше вы могли «изящно» останавливать контекст, но я не смог найти это в текущей документации PySpark.)
Я также должен отметить, что я не вызываю unpersist () для фреймов данных в цикле после того, как я закончил с ними, но я также не вызываю для них persist (); Я просто переписываю фреймы данных в каждом цикле (что может быть частью проблемы).
Я работаю с нашей командой инженеров над настройкой параметров памяти, но мы знаем, что уже выделяем достаточно памяти, чтобы завершить один цикл этого скрипта (и один цикл выполняется без ошибок).
Были бы полезны любые предложения, включая инструменты, которые могут быть лучше для этого случая использования, чем Spark. Я использую Spark версии 1.6.1.