Осмысление больших данных

Преодоление самых серьезных проблем Apache Spark

Подробное руководство по наиболее сложным аспектам Spark и способам их преодоления специалистами по данным и инженерам.

Компьютерное фото Камрана Айдинова - www.freepik.com

Примерно 6 лет назад я впервые использовал Apache Spark, который на тот момент был доказательством того, что я начал заниматься аналитикой «больших данных». Не было никаких сомнений в том, что освоение Spark было обязанностью любого подражателя в области науки о данных или инженера по данным; в конце концов, как еще использовать огромные объемы данных и распределенные вычисления на ЦП для создания наилучших возможных моделей машинного обучения?

Если бы в то время я мог видеть будущее в 2020 году, я, возможно, был бы немного удивлен тем, что большой процент практиков машинного обучения и искусственного интеллекта все еще не использует Spark или использует его только для инженерии данных, а не машинного обучения. Частично это, естественно, связано с частичным смещением интереса к методам машинного обучения, ориентированным на GPU, а не на CPU, особенно к глубокому обучению. Но для большинства приложений, не относящихся к обработке изображений и естественного языка, где полезность методов, ориентированных на ЦП, не вызывает сомнений, удивительно, что многие специалисты по данным до сих пор в значительной степени полагаются на инструменты машинного обучения машинного обучения, такие как Scikit-learn и нераспределенные версии XGBoost и LightGBM.

Лично мне жаль, потому что при правильном использовании Spark - невероятно мощный инструмент для всех, кто работает с данными, помогающий нам не тратить время на выяснение того, как разместить большие наборы данных в памяти и процессорах, и позволяющий нам чтобы иметь полный контроль над рабочим процессом анализа данных, включая извлечение данных, создание моделей и развертывание моделей в производстве и тестировании.

Проведя семинары и обучив десятки специалистов по данным и инженеров по Apache Spark, я смог понять самые большие проблемы, с которыми обычно сталкиваются пользователи с этим инструментом, почему они возникают и как их преодолевать. Это руководство, состоящее из двух частей, предназначено для тех, кто не только хочет иметь возможность использовать Spark, но и по-настоящему понять его внутреннее устройство, чтобы решать сложные проблемы и генерировать высокопроизводительный и стабильный код.

Обратите внимание: я предполагаю, что читатель уже имеет базовое представление о Spark, например, что такое драйверы и исполнители Spark, что наборы данных разделены на разделы, что такое отложенная оценка и основные структуры данных Spark.

Часть 1. Разбиение на разделы и управление ресурсами Spark

Соревнование

В отличие от обычного однопроцессорного Python (например, Pandas), где детали внутренней обработки представляют собой «черный ящик», выполнение распределенной обработки с использованием Spark требует от пользователя принятия потенциально огромного количества решений:

  • Сколько разделов использовать для каждого набора данных?
  • Когда нужно перераспределить набор данных?
  • Сколько исполнителей Spark использовать, сколько памяти и сколько ядер им выделить?
  • Сколько памяти выделить для драйвера Spark?

Чтобы усложнить ситуацию:

  • Типичный конвейер обработки включает в себя несколько операций, и каждая операция может приводить к очень разным размерам наборов данных, из-за чего маловероятно найти единый набор параметров, который подходит для всего конвейера данных.
  • Некоторые операции Spark автоматически изменяют количество разделов, что еще больше усложняет пользователю отслеживание того, сколько разделов используется для каждого набора данных. Например, операция соединения изменит количество разделов выходного набора данных на число, указанное в параметре конфигурации spark.sql.shuffle.partitions.

Что важно знать

Последствия каждого решения относительно разделения и управления ресурсами можно резюмировать следующим образом:

  1. Если мы используем слишком много разделов для определенной таблицы, это может привести к медленной работе / нерациональному использованию ресурсов по нескольким причинам, таким как накладные расходы на оптимизацию и распараллеливание, например, общий объем ЦП, необходимый для обработки определенного объема данных с помощью Spark. как один раздел намного быстрее, чем его обработка как несколько разделов. Это так, даже если разделы обрабатываются на одном физическом сервере;
  2. Если мы используем слишком мало разделов для определенной таблицы, это может привести к медленной работе / нерациональному использованию ресурсов, так как многие исполнители будут «бездействовать» после завершения обработки своих разделов, ожидая завершения работы других разделов. обработка;
  3. Если мы будем выполнять слишком частое повторное разбиение на разделы, это может привести к медленной работе / бесполезной трате ресурсов из-за перетасовки, которая заключается в перегруппировке разделов между исполнителями. Перемешивание - это дорогостоящая операция как с точки зрения обработки, так и с точки зрения памяти, и она серьезно ограничивает оптимизацию обработки Spark, которую мы обсудим позже;
  4. Если мы не используем достаточное повторное разделение, это может привести к медленной работе / бесполезной трате ресурсов из-за перегрузки исполнителя и простоя, вызванного несбалансированными разделами; т.е. очень большой раздел может привести к нехватке ресурсов у исполнителя с точки зрения ядер и памяти, а также к бездействию других исполнителей, пока они ждут обработки раздела;
  5. Если мы используем слишком много ядер для каждого исполнителя, это может привести к медленной работе / неэффективной трате ресурсов из-за более низкого, чем ожидалось, уровня распараллеливания. Когда исполнитель управляет более чем одним ядром, он пытается создать несколько потоков для одновременной обработки более чем одного раздела, позволяя совместно использовать переменные в памяти между потоками. Однако это распараллеливание несовершенно и может ограничиваться операциями ввода-вывода (например, записью в HDFS или другое распределенное хранилище данных). Часто одно ядро ​​будет использоваться, даже если пользователь назначил несколько ядер на исполнителя. Кроме того, поскольку исполнитель может работать только на одном физическом сервере, может оказаться невозможным выделить все ядра, запрошенные пользователем, в исполнителе. Количество ядер, эффективно используемых исполнителем, может отслеживаться пользователем через пользовательский интерфейс Spark;
  6. Если мы используем слишком мало ядер для каждого исполнителя, это может привести к медлительности / бесполезной трате ресурсов из-за накладных расходов, необходимых для создания каждого исполнителя. Поскольку Spark знает, что ядра одного и того же исполнителя гарантированно находятся на одном физическом сервере, он может оптимизировать обработку таким образом, чтобы накладные расходы на распараллеливание между ними были намного меньше, чем между исполнителями.

Чтобы справиться с изменением потребности в разделах и ресурсах на нескольких этапах конвейера, а также с трудностями при отслеживании количества разделов каждого промежуточного набора данных, есть две стратегии, которые можно использовать по отдельности или комбинировать:

  • Когда вычислительные ресурсы распределяются между пулом пользователей или пользователь одновременно выполняет несколько заданий, активация динамического распределения исполнителей с помощью параметров spark.dynamicAllocation.maxExecutors и spark.dynamicAllocation.enabled может значительно уменьшить время простоя вычислительных ресурсов Spark;
  • Разделение большого задания обработки на несколько меньших заданий, каждое с меньшим изменением размеров набора данных, позволяет нам лучше настраивать параметры конфигурации и количество разделов для каждого шага. Для каждого из этих небольших заданий мы также можем установить параметры spark.default.parallelism и spark.sql.shuffle.partitions соответствующим образом, чтобы предотвратить необходимость постоянного повторного разделения. Инструменты конвейера, такие как Airflow, могут быть чрезвычайно полезны для настройки и развертывания сложных задач с данными, состоящих из небольших заданий.

Часть 2: неизменяемость, отложенная оценка и оптимизация плана выполнения Spark

Соревнование

Два из первых вещей, которые узнает пользователь Spark, - это концепции неизменяемости и ленивого вычисления. Неизменяемость означает, что наборы данных Spark не могут быть изменены; каждая операция с набором данных создает новый набор данных. Ленивая оценка основана на том факте, что над наборами данных есть 2 типа операций:

  • Преобразования создают новый набор данных Spark в качестве вывода (Spark имеет свойство неизменяемости, поэтому он никогда не может изменять существующие наборы данных, а только создавать новые);
  • Действия принимают наборы данных Spark в качестве входных данных, но приводят к чему-то другому, кроме набора данных Spark, например к записи в хранилище, созданию локальной (не Spark) переменной или отображению чего-либо в пользовательском интерфейсе пользователя.

Когда процесс вызывает операцию преобразования, это приводит к созданию неизменного «плана выполнения» в памяти, который описывает, как сгенерировать набор данных на основе других наборов данных. Однако на данный момент фактических данных не создается. Только при вызове операции действия Spark оценивает планы выполнения входных данных, чтобы сгенерировать данные, необходимые для вычисления выходных данных. Поскольку сами входные данные могут иметь планы выполнения, которые зависят от других наборов данных, они рекурсивно оцениваются, и процесс продолжается.

Если в какой-то момент запускается другое действие, требующее повторного воссоздания того же промежуточного набора данных, процесс повторяется; план выполнения создается заново, и все шаги плана выполнения необходимо выполнять заново. Чтобы этого не произошло, пользователь может «сохранить» промежуточный набор данных. Когда «постоянный» набор данных создается посредством запуска плана выполнения, набор данных будет сохранен в распределенной памяти (или в некотором сконфигурированном распределенном хранилище, если в памяти недостаточно места), где он будет оставаться до тех пор, пока вручную не «не сохранится». »Или до тех пор, пока сборщик мусора Spark не определит, что набор данных« вне области видимости »(недоступен для работающего кода).

Многие пользователи Spark не задумываются о концепциях неизменяемости и ленивых вычислений и их практических последствиях, полагая, что достаточно знать, что мы должны «сохранить» промежуточный набор данных, когда он будет использоваться более одного раза , предотвращение дублирования вычислений более чем для одного действия требует одного и того же промежуточного набора данных.

Однако правда в том, что правильное использование неизменяемости и ленивых вычислений выходит далеко за рамки этого. На самом деле они тесно связаны с менее известным аспектом, оптимизацией плана выполнения, выполняемой с помощью Spark Catalyst. Непонимание этих концепций может легко привести к медленной и нестабильной обработке Spark, а также к потере пользователем большого количества времени на отладку и расшифровку загадочных сообщений об ошибках.

Что важно знать

  • Spark выполняет преобразования на уровне раздела, а не набора данных

У нас может сложиться ложное впечатление, что вызов действия приводит к последовательному выполнению серии преобразований, предшествующих действию, при этом каждый шаг генерирует один промежуточный набор данных. Если бы это было так, если бы DataFrame2 был сгенерирован путем преобразования DataFrame1, Spark сначала создаст DataFrame1, а затем создаст DataFrame2, как если бы мы использовали Pandas.

Но все не так просто. Когда вызывается действие, все необходимые планы выполнения фактически объединяются в один план выполнения, и, за исключением случая перетасовки, обработка каждого раздела выполняется независимо. Это означает, что преобразования выполняются на уровне раздела, а не набора данных. Таким образом, для одного раздела Spark может по-прежнему генерировать DataFrame1, тогда как для другого раздела Spark уже может генерировать DataFrame2 из DataFrame1. Например, у нас может быть такая ситуация:

С точки зрения производительности, это мощный механизм, поскольку он предотвращает потери процессора и памяти, т. Е. Ожидание генерации всего промежуточного набора данных перед переходом к следующему промежуточному набору данных. Еще одно преимущество преобразований на уровне разделов состоит в том, что, как мы увидим в следующем разделе, они позволяют Spark лучше оптимизировать обработку.

  • Spark оптимизирует планы выполнения, и чем больше планы выполнения, тем лучше для оптимизации

Spark Catalyst, несомненно, является одной из самых ценных функций Spark, так как эффективная распределенная обработка намного сложнее, чем эффективная обработка с одним ядром или одной памятью. По сути, Catalyst оптимизирует планы выполнения, чтобы максимизировать распределенную производительность. Например, в Spark выполнение нескольких операций в одной строке за раз, а затем переход к следующей строке и т. Д. - это очень сложная задача. быстрее, чем выполнение нескольких операций в одном столбце с последующим переходом к следующему столбцу, поэтому план выполнения оптимизируется соответствующим образом.

Важным фактором, который следует учитывать, является то, что один большой план выполнения имеет гораздо больший потенциал оптимизации, чем несколько более мелких. Точно так же, используя тот факт, что преобразования выполняются на уровне раздела, а не на уровне набора данных, Spark может комбинировать несколько преобразований для одних и тех же разделов и оптимизировать их для еще большей производительности. В нашем предыдущем примере результат после оптимизации будет таким:

  • Ленивая оценка и преобразования на уровне разделов Spark усложняют отладку

Однако с точки зрения разработки / отладки, если что-то не так или есть узкое место в производительности, ленивая оценка в сочетании с преобразованиями на уровне раздела затрудняет пользователю точное определение шага обработки, вызывающего ошибку. ошибка или узкое место. В конце концов, пользовательский интерфейс Spark будет сообщать пользователю только то, какое действие запуска вызывает ошибку или узкое место, но не фактическое преобразование.

Решение состоит в том, чтобы «заставить» Spark генерировать один полный промежуточный набор данных каждый раз, вставив несколько операторов persist(), за которыми следуют действия, такие как count() или take(). Однако этот код был бы крайне неэффективным из-за вышеупомянутой потери процессора / памяти и отсутствия оптимизации, поэтому лучше использовать такое решение только для целей разработки (не для производства) и на временной основе.

  • Слишком большой / слишком большой план выполнения также может стать проблемой

Я заметил одну вещь: планы выполнения не всегда масштабируются линейно с количеством операций, но иногда полиномиально или экспоненциально. Поскольку планы выполнения хранятся в памяти драйвера Spark (в отличие от постоянных объектов, которые хранятся в памяти исполнителей Spark), это может привести к тому, что Spark исчерпает память драйвера или станет очень медленным из-за оптимизации Spark Catalyst . Это несколько иронично, поскольку предполагается, что Catalyst ускоряет выполнение кода при развертывании на исполнителях Spark; но Catalyst, который работает в драйвере Spark, может стать узким местом самой обработки.

Это особенно верно для DataFrames с тысячами столбцов, где нам нужно выполнить несколько операций для всех столбцов. Свойство неизменности Spark усугубляет ситуацию, поскольку Catalyst постоянно создает и оценивает новые экземпляры планов исполнителей.

Упорство может быть эффективной стратегией решения этой проблемы. Это устранит необходимость в создании новых экземпляров плана исполнителя до создания постоянного промежуточного набора данных, а также предотвратит попытки Catalyst оптимизировать план выполнения перед созданием набора данных.

Тем не менее, для сложных, длинных конвейеров планы выполнения становятся чрезвычайно большими, и даже частое сохранение не может предотвратить замедление, вызванное оптимизацией или нехваткой памяти для драйвера, поскольку сохранение может предотвратить создание нескольких экземпляров больших планов выполнения, но не их создание в первую очередь. Кроме того, слишком много постоянного может стать проблемой, так как занимает распределенную память и место для хранения. Хотя частые попытки отказаться от настаивания и / или попытка структурировать код для максимизации сбора мусора, безусловно, помогают, это сложная инженерная задача, которая не всегда может предотвратить истощение ресурсов.

Одним из решений для этого является метод под названием контрольная точка, который заключается в сохранении и загрузке промежуточных результатов из соответствующей системы хранения, определенной пользователем. При эффективном использовании контрольные точки предотвращают как резкое увеличение размера планов выполнения, так и компрометацию слишком большого количества распределенных ресурсов Spark с помощью постоянных объектов.

Другое решение, о котором уже упоминалось, - разделить большое задание по обработке на более мелкие. Таким образом, планы выполнения не могут стать слишком большими, и нет необходимости беспокоиться об оставшихся объектах в памяти драйвера или исполнителей.

Заключение

В этой статье, безусловно, есть гораздо больше о Spark, например как структурировать код для повышения качества оптимизации Catalyst. Но я надеюсь, что эта статья может помочь нескольким людям преодолеть трудности со Spark и использовать его исключительную мощность и универсальность как для разработки данных, так и для машинного обучения. Хотя Spark остается сложным инструментом для работы, я могу честно рассказать о том, как Databricks и сообщество разработчиков ПО с открытым исходным кодом значительно улучшили его с точки зрения удобства и гибкости за те 6 лет, которые я использую.

Если вы хотите узнать, как использовать Spark и Apache Hadoop YARN для параллельного выполнения нескольких распределенных заданий, например, нескольких экземпляров обучения модели, прочтите мою другую статью.