Итерация / зацикливание паркетных файлов Spark в сценарии приводит к ошибке / накоплению памяти (с использованием запросов Spark SQL)

Я пытался выяснить, как предотвратить сбой Spark из-за проблем с памятью, когда я перебираю файлы паркета и несколько функций постобработки. Извините за поток текста, но это не совсем одна конкретная ошибка (я использую PySpark). Приносим извинения, если это нарушает правильную форму переполнения стека!

Базовый псевдокод:

#fileNums are the file name partitions in the parquet file
#I read each one in as a separate file from its  "=" subdirectory
for counter in fileNums:
  sparkDataFrame = sqlContext.read.parquet(counter)
  summaryReportOne = sqlContext.sql.("SELECT.....")
  summaryReportOne.write.partition("id").parquet("/")
  summaryReportTwo = sqlContext.sql.("SELECT....")
  summaryReportTwo.write.partition("id").parquet("/")
  #several more queries, several involving joins, etc....

В этом коде используются искровые SQL-запросы, поэтому мне не удалось создать функцию-оболочку со всеми SQL-запросами / функциями и передать ее в foreach (который не может принимать sparkContext или sqlQuery в качестве входных данных) в отличие от стандарта для петля.

Технически это один большой паркетный файл с разделами, но он слишком велик, чтобы читать все сразу и запрашивать по нему; Мне нужно запустить функции на каждом разделе. Поэтому я просто запускаю обычный цикл Python в PySpark, где в каждом цикле я обрабатываю один паркетный раздел (подкаталог) и пишу соответствующие выходные отчеты.

Не уверены, сработает ли обертывание всего кода вокруг большого mapPartition () из-за размера всего файла паркета?

Но после нескольких циклов скрипт вылетает из-за ошибок памяти, в частности из-за ошибки кучи Java. (Я подтвердил, что нет ничего особенного в файле, для которого происходит сбой цикла; это происходит с любым случайным файлом, считываемым во втором или третьем цикле.)

Caused by: com.google.protobuf.ServiceException:     
java.lang.OutOfMemoryError: Java heap space
at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:244)
at com.sun.proxy.$Proxy9.delete(Unknown Source)
at    org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.delete(ClientNamenodeProtocolTranslatorPB.java:526)
... 42 more
Caused by: java.lang.OutOfMemoryError: Java heap space

Я понимаю, что Spark не предназначен для выполнения в цикле, но эти SQL-запросы слишком сложны для стандартных упакованных функций Spark SQL, и мы составляем несколько сводных отчетов для каждого файла по разной статистике агрегирования.

Есть ли способ в основном очистить память в конце каждого индекса цикла? Удаление любых зарегистрированных временных таблиц с помощью sqlContext.dropTempTable () и очистка кеша с помощью sqlContext.clearCache () не помогли. Если я попытаюсь остановить sparkContext и перезапустить его в каждом цикле, я также получу ошибки, так как некоторые процессы еще не завершены (похоже, что раньше вы могли «изящно» останавливать контекст, но я не смог найти это в текущей документации PySpark.)

Я также должен отметить, что я не вызываю unpersist () для фреймов данных в цикле после того, как я закончил с ними, но я также не вызываю для них persist (); Я просто переписываю фреймы данных в каждом цикле (что может быть частью проблемы).

Я работаю с нашей командой инженеров над настройкой параметров памяти, но мы знаем, что уже выделяем достаточно памяти, чтобы завершить один цикл этого скрипта (и один цикл выполняется без ошибок).

Были бы полезны любые предложения, включая инструменты, которые могут быть лучше для этого случая использования, чем Spark. Я использую Spark версии 1.6.1.


person kplaney    schedule 20.05.2016    source источник


Ответы (2)


Обновление: если я вызываю unpersist () для каждой таблицы, которую я создаю из запроса sql после того, как я закончил с ним в каждом цикле, цикл может успешно продолжиться на следующей итерации без проблем с памятью. .clearCache () и удаление только временных таблиц, как отмечалось выше, не помогли. Я предполагаю, что это сработало, потому что, хотя таблицы были из запросов sparkSQL, которые возвращают RDD.

Несмотря на то, что я не вызывал persist () для этих RDD, мне пришлось сказать Spark, чтобы он очистил их до начала следующего цикла, чтобы я мог назначать новые SQL-запросы этим же именам переменных.

person kplaney    schedule 20.05.2016
comment
Кроме того, просто к сведению для новичков Spark: если функция не является UDF (что обычно подразумевает, что это несколько просто или возвращает, скажем, только один столбец, а не всю таблицу), и вы можете действовать на фрейм данных Spark, используя традиционные функции UDF API / синтаксис, затем избегайте функций и просто напишите свой код как один длинный скрипт. Это выглядит некрасиво, но, похоже, это ускорило мой код почти на 50%. И избегайте просто цикла в Python - вызывайте свой искровый скрипт из Bash в цикле, чтобы вы начинали со свежего контекста Spark каждый раз, когда обрабатываете новый файл / фрейм входных данных. Это позволит избежать проблем с памятью. - person kplaney; 25.05.2016

Если у вас есть возможность, попробуйте обновиться до недавно выпущенной Spark 2.0.

Я столкнулся с очень похожей проблемой с пространством кучи java, как и вы. Я смог превысить 4G пространства кучи, просто повторив процесс создания фрейма данных и снова и снова вызывая его с помощью Spark 1.6.2.

С Spark 2.0, использующим SparkSession, та же программа получила только 1,2 ГБ места в куче, а использование памяти было очень стабильным, как я ожидал от этой программы, которую я запускал.

person noahpc    schedule 08.08.2016