Самая большая комната в этом мире — это комната для совершенствования. С новыми технологиями и платформами, которые появляются быстрыми темпами, всегда появляются возможности для улучшения методов работы.

В этой статье рассматривается одно из таких путешествий, которое началось с проникновения в неизвестное пространство миграции существующих устаревших проектов в пространство больших данных. Выполнение этого на уровне предприятия имело свой собственный набор проблем как в функциональных, так и в нефункциональных областях.

Одной из основных причин этого шага было улучшение контроля над нашими решениями по мере того, как мы наращивали наши инновационные механизмы. Несмотря на то, что у нас были устаревшие системы, объем наших данных рос с каждым годом. Затем команды пришли к консенсусу по разработке новых решений для обеспечения масштабируемости, безопасности и управления. Это также хорошо увязывалось с преобразованием, которое мы применяли к нашему стеку технологий.

Почему Apache Spark?

Apache Spark — это ведущий механизм обработки данных с открытым исходным кодом, используемый для пакетной обработки, машинного обучения, потоковой обработки и крупномасштабного SQL (язык структурированных запросов). Он был разработан, чтобы сделать обработку больших данных быстрее и проще. С момента своего создания Spark приобрел огромную популярность как платформа для обработки больших данных и широко используется в различных отраслях и компаниях, которые имеют дело с большими объемами данных.

В этом посте будут представлены практические решения, позволяющие максимально увеличить наши шансы сократить время вычислений за счет оптимизации заданий Spark без необходимости поиска в Интернете или других источниках знаний.

Стратегия определяет различные этапы выполнения, при этом каждый этап выполнения основывается на предыдущем и сокращает время вычислений за счет внесения новых улучшений и рекомендаций.

Наш вариант использования был для сложного калькулятора капитала кредитного риска. Задача состояла в том, чтобы выполнить эти вычисления на чрезвычайно больших объемах данных с большей производительностью и меньшими затратами. Приведенные здесь рекомендации относятся к кластеру YARN и HDFS, но могут применяться и к другим инфраструктурам.

Мы начнем с обзора, описывающего различные этапы выполнения и улучшенное время выполнения на каждом этапе, как показано выше. Первые пять прогонов были выполнены на Spark 2.4, а последний — на Spark 3.1.

Вопросы для запуска 1: сериализация, формат файла Parquet и трансляция

1) Сериализация.Сериализация помогает преобразовывать объекты в потоки байтов и наоборот. Когда мы работаем над любым типом вычислений, наши данные преобразуются в байты и передаются по сети. Если мы будем передавать меньше данных по сети, время, необходимое для выполнения задания, соответственно уменьшится. Spark предоставляет два типа сериализации: Java и Kryo.

Сериализация Java

  • Предоставляется по умолчанию и может работать с любым классом, расширяющим java.io.Serializable.
  • Гибкий, но довольно медленный и приводит к большим сериализованным форматам для многих классов.

Сериализация Kryo

  • Быстрее и компактнее по сравнению с сериализацией Java
  • Требуется предварительная регистрация классов для лучшей производительности

2) Формат файла Parquet.Apache Parquet — это формат файла данных с открытым исходным кодом, ориентированный на столбцы, разработанный для эффективного хранения и поиска данных. Он обеспечивает эффективное сжатие данных и схемы кодирования с повышенной производительностью для обработки больших объемов сложных данных.

Столбчатые форматы привлекательны как с точки зрения размера файла, так и с точки зрения производительности запросов. Размеры файлов обычно меньше, чем эквиваленты, ориентированные на строки, поскольку значения из одного столбца хранятся рядом друг с другом. Кроме того, повышается производительность запросов, поскольку механизм запросов может пропускать ненужные столбцы.

3) Широковещательная рассылка. Соединение двух таблиц — это стандартная операция в Spark. Обычно между исполняющими узлами по сети происходит обмен большими объемами данных. Этот обмен может вызвать задержку в сети. Spark предлагает несколько стратегий объединения для оптимизации этой операции. Один из них — Broadcast Hash Join. Если одна из таблиц достаточно мала (по умолчанию 10 МБ, но может быть увеличена до 40 МБ), меньшую таблицу можно транслировать каждому исполнителю в кластере, и можно избежать операций перемешивания.

Широковещательное хеш-соединение происходит в 2 этапа: широковещательное и хеш-соединение.

  • Фаза широковещательной рассылки: небольшой набор данных передается всем исполнителям.
  • Фаза хэш-соединения: небольшой набор данных хешируется во всех исполнителях и объединяется с разделенным большим набором данных.

Вот несколько вещей, на которые следует обратить внимание в отношении вещания:

  1. Широковещательное отношение должно полностью умещаться в памяти каждого исполнителя, а также в драйвере, потому что последний начинает передачу данных.
  2. Когда размер передаваемых данных велик, вы получите исключение OutOfMemory.
  3. Вещание работает только для соединений equal(‘=’).
  4. Вещание работает для всех типов соединений (внутреннее, левое, правое), кроме полных внешних соединений.
  5. Spark развертывает эту стратегию соединения, когда размер одного из отношений соединения меньше пороговых значений (по умолчанию 10 МБ). Свойство Spark, определяющее этот порог, — spark.sql.autoBroadcastJoinThreshold (настраиваемый).

Прогон 1 занял 85 минут.

Вопросы для запуска 2: разрыв родословной

В нашем случае использования у нас есть сложные вычисления, которые включают итерационные и рекурсивные алгоритмы, которые выполняются на больших объемах данных. Каждый раз, когда мы применяем преобразования к DataFrame, план запроса увеличивается. Когда этот план запроса становится огромным, производительность резко снижается, что приводит к узким местам.

Нецелесообразно связывать множество преобразований в наследство, особенно когда вам нужно обработать огромный объем данных с минимальными ресурсами. Следовательно, предпочтительнее разорвать родословную.

Ниже приведен пример вычисления с преобразованиями и действиями. Чтобы ускорить работу и упростить обработку, мы разорвали родословную на шаге E.

Мы протестировали следующие три метода разрыва родословной:

Контрольная точка. Контрольная точка — это процесс усечения плана выполнения и его сохранения в надежной распределенной (HDFS) или локальной файловой системе. Это функция Spark, которая особенно полезна для высокоитеративных алгоритмов данных. Файлы контрольных точек можно использовать в последующих запусках заданий или программах-драйверах.

Чекпойнт может быть нетерпеливым или ленивым, в зависимости от флага нетерпеливого оператора чекпойнт. Первое используется по умолчанию и происходит сразу же по запросу, а второе происходит только при выполнении действия. Разработчики могут использовать следующий синтаксис:val сломанныйLineageDf = existsDf.checkpoint()

Локальная контрольная точка. Работает аналогично контрольной точке, но данные не передаются в HDFS, а вместо этого сохраняются в локальной файловой системе исполнителя. Если исполнитель будет уничтожен во время обработки, данные будут потеряны, и Spark не сможет воссоздать этот кадр данных из DAG (направленный ациклический граф). Разработчики могут использовать следующий синтаксис:
val сломанныйLineageDf = existsDf.localCheckpoint()

Запись данных в HDFS в паркете: когда мы делаем контрольную точку RDD/DataFrame, они сериализуются и сохраняются в HDFS. Он не хранит его в формате паркета, паркет обеспечивает эффективное хранение данных.

Прерывание родословной путем записи в HDFS в паркете дало нам лучшую производительность из трех вышеперечисленных.

После разрыва линии передачи второй забег занял 55 минут и 36 секунд,значительное улучшение по сравнению с первым забегом, который длился 85 минут.

Также было бы неплохо отметить, что кэширование предлагает альтернативу для повышения производительности без нарушения наследования. Разницу между ними и когда использовать один над другим, можно прочитать здесь.

Вопросы для запуска 3: правое перетасовывание разделов

Выбор правильного номера раздела для перетасовки помогает повысить производительность работы. Разделение определяет степень параллелизма в задании, так как между задачей и разделом существует взаимно-однозначная корреляция (каждая задача обрабатывает один раздел).

Идеальный размер каждого раздела составляет около 100–200 МБ. Разделы меньшего размера увеличивают количество параллельно выполняемых заданий, что может повысить производительность, но слишком маленький раздел приведет к накладным расходам и увеличит время сборки мусора. Большие разделы уменьшат количество параллельно выполняемых заданий, а также оставят некоторые ядра бездействующими, что увеличит время обработки.

Как в случае перемешивания выбрать правильный номер раздела для перемешивания (spark.sql.shuffle.partitions)?

Мы следовали следующим рекомендациям Spark и применяли их на практике:

  1. Если промежуточные данные слишком велики, мы должны увеличить разделы в случайном порядке, чтобы сделать разделы меньше.
  2. В случае бездействия ядер во время выполнения задания увеличение количества разделов в случайном порядке помогает повысить производительность задания.
  3. Если промежуточные разделы небольшие (в КБ), то помогает уменьшение разделов в случайном порядке.
  4. Для кластера с огромной емкостью количество разделов должно быть в 1–4 раза больше числа ядер, чтобы добиться оптимальной производительности. Например, с данными 40 ГБ и 200 ядер установите для раздела перемешивания значение 200 или 400.
  5. Для кластера с ограниченной емкостью для перемешивания разделов можно установить размер входных данных / размер раздела (100–200 МБ на раздел). Наилучшим сценарием было бы установить раздел в случайном порядке кратным количеству ядер для достижения максимального параллелизма, в зависимости от емкости кластера. Например:
    - для данных объемом 1 ГБ с 6 ядрами (исполнительные ядра 3 с максимальным исполнителем 2). Идеальное количество разделов для перемешивания должно быть 12 (в 2 раза больше ядер) с размером раздела 100 МБ.
     – Для данных объемом 20 ГБ с 40 ядрами установите для разделов для перемешивания значение 120 или 160 (в 3–4 раза больше размера раздела). ядер) с размером раздела 200 МБ).

Мы выполнили нашу работу с правильным перетасовочным разделом, и это заняло 45 минут и 39 секунд.

Рекомендации для выполнения 4: оптимизация кода

Для этого прогона мы сосредоточились на внесении двух изменений на уровне кода.

1. Замените объединение и агрегирование функциями окна: в большинстве наших вычислений нам приходилось выполнять агрегирование для указанных столбцов. Результат должен был быть сохранен как новый столбец. В данном случае эта операция состоит из агрегации, за которой следует соединение.

Более оптимизированным вариантом здесь было бы использование оконных функций. Заменив в нашем коде объединение и агрегацию оконными функциями, мы добились значительного улучшения.

Простой тест и DAG-представление двух методов можно найти здесь.

2. Заменить withColumn на Select: каждая операция над DataFrame приводит к созданию нового DataFrame. В тех случаях, когда нам нужно многократно вызывать withColumn, лучше иметь один DataFrame.

Вместо использования:

использовать:

DAG также создает ненужные перетасовки, когда у нас есть withColumn с несколькими спецификациями окна:

Использование выбора:

Из приведенного выше плана выполнения мы видим, что использование Select — лучший выбор.

Прогон 4 занял всего 33 минуты и 53 секунды.

Рекомендации для запуска 5: спекулятивное исполнение

Apache Spark имеет функцию спекулятивного выполнения для обработки медленных задач на этапе из-за проблем среды, таких как медленная сеть. Если одна задача на этапе выполняется медленно, драйвер Spark может запустить для нее задачу предположения на другом хосте. Между обычной задачей и задачей-спекуляцией система Spark возьмет результат первой успешно выполненной задачи и убьет более медленную.

В случае длительных заданий (где некоторые задачи выполняются медленнее других) — что можно определить, отслеживая время, затрачиваемое через пользовательский интерфейс Spark, — включение спекуляции может помочь.

Если для параметра spark.speculation задано значение true, то медленные задачи определяются на основе медианы, вычисляемой путем определения времени выполнения других задач. После выявления медленно выполняющихся заданий на других узлах инициируются спекулятивные задачи для завершения задания.

Включение предположения привело к тому, что запуск 5 занял всего 28 минут 19 секунд.

Рекомендации для выполнения 6: включение AQE (выполнение адаптивного запроса)

Для этого прогона мы включили AQE, основную функцию Spark 3.0. AQE можно включить, задав SQL config spark.sql.adaptive.enabled значение true (false — значение по умолчанию в Spark 3.0).

В Spark 3.0 платформа AQE поставляется с тремя функциями:

1) Динамическое объединение разделов в случайном порядке упрощает или даже позволяет избежать настройки количества разделов в случайном порядке. Пользователи могут вначале задать относительно большое количество разделов для перемешивания, а AQE сможет затем во время выполнения объединить соседние небольшие разделы в более крупные.
(установитеspark.sql.adaptive. CoalescePartitions.enabled=true)

2) Динамическое переключение стратегий соединения частичнопредотвращает выполнение неоптимальных планов из-за отсутствия статистики и/или неправильной оценки размера. Эта адаптивная оптимизация автоматически преобразует объединение сортировки-слияния в соединение широковещательного хеширования во время выполнения, что еще больше упрощает настройку и повышает производительность.
(установитеspark.sql.adaptive.localShuffleReader.enabled=true)

3) Динамическая оптимизация косых соединений — еще одно важное улучшение производительности, поскольку косые соединения могут привести к крайнему дисбалансу работы и серьезному снижению производительности. После того, как AQE обнаружит любой перекос в статистике файла в случайном порядке, он может разбить перекошенные разделы на меньшие и соединить их с соответствующими разделами с другой стороны. Эта оптимизация может распараллелить обработку искажений и повысить общую производительность.
(установитеspark.sql.adaptive.skewJoin.enabled=true)

После обновления до Spark 3.0 (ознакомьтесь со списком новейших функций здесь) и включения функций AQE наша работа заняла 22 минуты 37 секунд.

Заключение

В этой статье объясняются различные стратегии, применяемые для оптимизации Spark в среде, в которой задействованы сложные вычисления.

Переход от использования стандартных методологий Spark к сериализации, форматированию файла паркета и трансляции к разрыву наследования позволил нам добиться значительного сокращения времени выполнения задания.

Затем мы улучшили это, используя правильный раздел перемешивания и оптимизировав наш код, а также используя спекулятивное выполнение для дальнейшего повышения производительности.

Наконец, мы развернули более новую версию Spark, которая позволила нам использовать функцию AQE (недоступную в Spark 2.x), что еще больше сократило время выполнения.

Я надеюсь, что эта статья поможет вам в вашем путешествии по оптимизации Spark, где сочетание вышеперечисленных стратегий может обеспечить наилучшую производительность при ограниченных ресурсах. В конце концов, это улучшило нашу производительность более чем на 70%. с 85 минут до 22 минут 37 секунд.

Гаурав работает инженером по машинному обучению и данным в DBS. Он всегда рад пообщаться в LinkedIn.

Рекомендации

https://spark.apache.org/docs/latest/sql-performance-tuning.html

https://databricks.com/blog/2020/06/18/introduction-apache-spark-3-0-now-available-in-databricks-runtime-7-0.html?fireglass_rsn=true

Использование спекуляции Spark для выявления и перепланирования медленных задач — Блог Юваля Ицчакова

«Как команда финансовой платформы DBS оптимизирует рабочие места Spark для повышения производительности | от кгшукла | Технический блог DBS | Середина"