Мы уже представили, что такое Mars в предыдущей статье, и сделали его открытым исходным кодом на GitHub после тестирования на наших внутренних системах. В этой статье представлена ​​архитектура распределенного выполнения, реализованная Mars.

Архитектура

Mars предоставляет библиотеку для распределенного выполнения тензоров. Библиотека написана с использованием модели акторов, реализованной mars.actors, и включает в себя планировщики, воркеры и веб-сервисы.

Граф, отправленный клиентом в Mars Web Service, состоит из тензоров. Веб-сервис получает графики и отправляет их планировщику. Перед отправкой задания каждому воркеру планировщик Mars компилирует тензорный граф в граф, состоящий из фрагментов и операндов, а затем анализирует и разбивает граф. Затем планировщик создает серию OperandActors, которые управляют выполнением одного операнда во всех планировщиках на основе согласованного хэша. Операнды планируются в топологическом порядке. После выполнения всех операндов вся фигура помечается как завершенная, и клиент может получить результаты из Интернета. Весь процесс выполнения показан на следующем рисунке.

Отправка задания

Клиент отправляет задания в службу Mars через RESTful API. Клиент пишет код на тензоре, а затем преобразует тензорную операцию в граф, составленный из тензоров через session.run(tensor), и отправляет его в веб-API. После этого веб-API отправляет задание объекту SessionActor и создает объект GraphActor в кластере для анализа графа и управления им. Клиент начинает запрашивать состояние выполнения графа, пока выполнение не закончится.

В GraphActor мы сначала преобразуем тензорный граф в граф, состоящий из операндов и чанков в соответствии с настройками чанков. Этот процесс позволяет дополнительно разделить граф и выполнять его параллельно. После этого мы проводим серию анализов на графе, чтобы получить приоритет операндов и назначить воркеров начальному операнду. Подробнее об этой части см. в разделе «Подготовка графа выполнения». Затем каждый операнд создает OperandActor для управления конкретным выполнением операнда. Когда операнд находится в состоянии READY (как описано в разделе Operand state), планировщик выбирает целевого рабочего процесса для операнда, а затем задание передается рабочему процессу для фактического выполнения.

Контроль выполнения

Когда операнд передается рабочему процессу, OperandActor ожидает обратного вызова рабочего процесса. Если операнд выполняется успешно, назначается преемник операнда. Если операнд не удается выполнить, OperandActor пытается выполнить его несколько раз. Если это все еще не удается, выполнение помечается как неудачное.

Отмена задания

Клиенты могут отменять запущенные задания с помощью RESTful API. Запрос на отмену записывается в хранилище состояний графа, и вызывается интерфейс отмены на GraphActor. Если задание находится в фазе подготовки, оно завершается сразу после обнаружения запроса на остановку, в противном случае запрос отправляется каждому OperandActor и состояние устанавливается на ОТМЕНА. Если операнд в это время не выполняется, состояние операнда сразу устанавливается на ОТМЕНА. Если операнд выполняется, запрос на остановку отправляется рабочему процессу и вызывает ошибку ExecutionInterrupted, которая возвращается в OperandActor. В это время состояние операнда помечается как ОТМЕНА.

Подготовка графа выполнения

Когда тензорный граф передается планировщику Mars, создается более детализированный граф, состоящий из операндов и фрагментов, в соответствии с параметрами фрагментов, содержащимися в источнике данных.

Сжатие графика

После создания графа фрагментов мы уменьшаем размер графа, объединяя соседние узлы в графе. Это слияние также позволяет нам в полной мере использовать ускоренные библиотеки, такие как numexpr, для ускорения процесса вычислений. В настоящее время Mars объединяет только операнды, образующие единую цепочку. Например, при выполнении следующего кода:

import mars.tensor as mt
a = mt.random.rand(100, chunks=100)
b = mt.random.rand(100, chunks=100)
c = (a + b).sum()

Mars объединяет операнды ADD и SUM в узел FUSE. Операнды Rand не сливаются, потому что они не образуют простую прямую линию с ADD и SUM.

Распределение начального работника

Очень важно распределить рабочие операции по операндам для производительности выполнения графа. Случайное распределение начальных операндов может привести к огромным издержкам сети и неравномерному распределению заданий между разными работниками. Распределение неначальных узлов можно легко определить в соответствии с физическим распределением данных, сгенерированных его предшественником, и состоянием простоя каждого рабочего. Поэтому на этапе подготовки графа выполнения мы рассматриваем только выделение начальных операндов.

Мы должны следовать нескольким принципам для первоначального распределения рабочих. Во-первых, операнды, выделенные каждому рабочему процессу, должны быть максимально сбалансированы. Это позволяет вычислительному кластеру иметь более высокую степень использования на протяжении всего этапа выполнения, что особенно важно на заключительном этапе выполнения. Во-вторых, начальное выделение узлов требует минимального сетевого трафика при выполнении последующих узлов. Другими словами, начальное размещение узлов должно полностью соответствовать принципу локальности.

Следует отметить, что в некоторых случаях приведенные выше принципы могут противоречить друг другу. Решение по распределению с минимальным сетевым трафиком может быть очень неравномерным. Мы разработали эвристический алгоритм для получения баланса между двумя целями. Алгоритм описывается следующим образом:

  1. Выбирается первый начальный узел и первая машина в списке;
  2. В неориентированном графе, преобразованном из графа операндов, поиск в глубину начинается с узла;
  3. Если осуществляется доступ к другому нераспределенному начальному узлу, мы выделяем его машине, выбранной на шаге 1;
  4. Когда общее количество операндов, к которым осуществляется доступ, превышает среднее количество операндов, принятых каждым рабочим потоком, выделение останавливается;
  5. Если еще остались рабочие, которым не были выделены операнды, перейти к шагу 1. В противном случае вычисление завершается.

Политика планирования

При выполнении графа, состоящего из операндов, соответствующая последовательность выполнения уменьшает объем данных, временно хранящихся в кластере, тем самым уменьшая вероятность сброса данных на диск. Соответствующий рабочий процесс может уменьшить общий сетевой трафик во время выполнения.

Политика выбора операндов

Соответствующая последовательность выполнения может значительно уменьшить общий объем данных, временно хранимых в кластере. На следующем рисунке показан пример сокращения дерева. Кружки представляют операнды, квадраты представляют фрагменты, красный цвет представляет собой выполнение операнда, синий цвет представляет возможность выполнения операнда, зеленый цвет представляет собой фрагменты, сгенерированные операндом, которые были сохранены, а серый цвет представляет собой операнд и связанные с ним данные, которые были освобождены. . Предполагая, что у нас есть два воркера и каждый операнд использует одинаковое количество ресурсов, каждый рисунок ниже показывает состояние после 5 единиц времени выполнения при разных политиках. На рисунке слева показано, что узлы выполняются в соответствии с иерархией, а на рисунке справа показано, что узлы выполняются в порядке приоритета в глубину. Данные 6 чанков должны быть временно сохранены в левом графе, а данные только 2 чанков должны быть сохранены в правом графе.

Наша цель — уменьшить общий объем данных, хранящихся в кластере, поэтому мы установили политику приоритета для операндов в состоянии READY:

  1. Операнды с большей глубиной должны выполняться первыми;
  2. Операнды, на которые полагаются более глубокие операнды, должны выполняться первыми;
  3. Узлы с меньшими выходными размерами должны выполняться в первую очередь.

Политика отбора работников

Когда планировщик готов выполнить граф, рабочий процесс начального операнда определен. Мы выделяем воркеров для последующих операторов на основе воркера, где находятся входные данные. Если рабочий процесс имеет наибольший размер входных данных, он выбирается для выполнения последующих операндов. Если есть несколько рабочих процессов с одинаковым размером входных данных, состояние ресурсов каждого кандидата на роль рабочих процессов играет решающую роль.

Состояние операнда

Каждый оператор в Марсе отдельно запланирован OperandActor. Процесс выполнения — это процесс перехода состояния. В OperandActor мы определяем функцию перехода состояния для процесса входа в каждое состояние. Начальный операнд находится в состоянии READY во время инициализации, а неначальный операнд находится в состоянии UNSCHEDULED во время инициализации. При выполнении заданных условий операнд переходит в другое состояние и выполняются соответствующие операции. Процесс перехода состояния можно увидеть на следующем рисунке:

Далее описывается значение каждого состояния и операции, выполняемые Марсом в этих состояниях.

  • UNSCHEDULED: операнд находится в этом состоянии, когда его восходящие данные не готовы.
  • READY: Операнд находится в этом состоянии, когда все исходящие входные данные готовы. После перехода в это состояние OperandActor отправляет задания всем исполнителям, выбранным в AssignerActor. Если рабочий процесс готов выполнить задание, он отправляет сообщение планировщику. Планировщик отправляет сообщение об остановке другим рабочим процессам, а затем отправляет сообщение рабочему процессу, чтобы начать выполнение задания.
  • ВЫПОЛНЯЕТСЯ: Операнд находится в этом состоянии, когда его выполнение было начато. Когда он входит в это состояние, OperandActor проверяет, было ли отправлено задание. Если нет, OperandActor строит граф, состоящий из операндов FetchChunk и текущего операнда, и отправляет его рабочему процессу. После этого OperandActor регистрирует обратный вызов в рабочем потоке, чтобы получить сообщение о том, что задание выполнено.
  • FINISHED: Операнд находится в этом состоянии, когда задание завершено. Когда операнд входит в это состояние и не имеет преемников, GraphActor отправляется сообщение, чтобы определить, закончилось ли выполнение всего графа. В то же время OperandActor отправляет своим предшественникам и преемникам сообщение о том, что выполнение завершено. Если предшественник получает сообщение, он проверяет, завершены ли все преемники. Если это так, данные о текущем операнде могут быть освобождены. Если преемник получает сообщение, он проверяет, были ли завершены все прекурсоры. Если это так, состояние преемника может быть переведено в READY.
  • FREED: Операнд находится в этом состоянии, когда все его данные освобождены.
  • FATAL: Операнд находится в этом состоянии, когда все попытки повторного выполнения терпят неудачу. Когда операнд входит в это состояние, он передает то же состояние узлу-преемнику.
  • ОТМЕНА: Операнд находится в этом состоянии, когда он отменяется. Если задание выполняется в данный момент, рабочему потоку отправляется запрос на отмену выполнения.
  • ОТМЕНЕНО: операнд находится в этом состоянии, когда выполнение отменено и остановлено. Если выполнение входит в это состояние, OperandActor пытается перевести состояние всех преемников в ОТМЕНА.

Детали выполнения в Workers

Рабочий процесс Mars содержит несколько процессов, чтобы уменьшить влияние GIL на выполнение. Конкретное выполнение завершается в независимом процессе. Чтобы уменьшить ненужное копирование памяти и взаимодействие между процессами, рабочий процесс Mars использует общую память для хранения результатов выполнения.

Когда задание отправляется рабочему процессу, оно сначала помещается в очередь, ожидающую выделения памяти. При выделении памяти данные о других воркерах или данные, которые были сброшены на диск на текущем воркере, перезагружаются в память. На данный момент все данные, необходимые для вычислений, уже находятся в памяти, и фактический вычислительный процесс готов к запуску. Когда вычисление завершено, рабочий помещает задание в общее хранилище. Отношения перехода четырех состояний выполнения показаны на рисунке ниже.

Контроль выполнения

Рабочий процесс Mars контролирует выполнение всех операторов в рабочем процессе через ExecutionActor. Сам актор не участвует в реальных операциях или передаче данных, а только отправляет задачи другим акторам.

OperandActor в планировщике отправляет задания рабочему через enqueue_graphcall в ExecutionActor. Рабочий процесс принимает отправленные операнды и кэширует их в очереди. Когда задание может быть выполнено, ExecutionActor отправляет сообщение планировщику, который определяет, выполнять ли операцию. Когда планировщик решает выполнить операнд на текущем рабочем потоке, он вызывает метод start_execution и регистрирует обратный вызов через add_finish_callback. Эта конструкция позволяет получать результаты выполнения в нескольких местах, что полезно для устранения сбоев.

ExecutionActor использует модуль mars.promise для одновременной обработки запросов на выполнение от нескольких операндов. Конкретные шаги выполнения связаны через метод then класса Promise. Когда окончательный результат выполнения сохраняется, запускается ранее зарегистрированный обратный вызов. Если на каком-либо из предыдущих шагов выполнения возникает ошибка, она передается функции-обработчику, зарегистрированной методом catch, и обрабатывается.

Сортировка операндов

Все операнды в состоянии READY передаются рабочему процессу, выбранному планировщиком. Следовательно, в течение большей части времени выполнения количество операндов, переданных рабочему процессу, обычно больше, чем общее количество операндов, которые рабочий процесс может обработать. Рабочий должен отсортировать операнды, а затем выбрать некоторые из них для выполнения. Этот процесс сортировки выполняется в TaskQueueActor, который поддерживает приоритетную очередь, в которой хранится информация об операндах. В то же время TaskQueueActor регулярно запускает задачу выделения заданий, выделяя ресурсы выполнения для операндов в начале приоритетной очереди до тех пор, пока не закончатся дополнительные ресурсы для выполнения операндов. Этот процесс распределения также запускается, когда отправляется новый операнд или когда выполнение операнда завершается.

Управление памятью

Работник Марса управляет двумя аспектами памяти. Первая часть — это личная память каждого рабочего процесса, которая принадлежит каждому процессу. Вторая часть — это память, разделяемая всеми процессами, которая хранится в plasma_store в Apache Arrow.

Чтобы избежать переполнения памяти процесса, мы ввели QuotaActor рабочего уровня для выделения памяти процесса. Прежде чем операнд начнет выполняться, он отправляет пакет запросов памяти в QuotaActor для входных и выходных фрагментов. Если оставшееся пространство памяти может удовлетворить запрос, запрос принимается QuotaActor. В противном случае запросы ставятся в очередь для ожидания свободных ресурсов. Когда освобождается соответствующая память, запрошенные ресурсы также освобождаются. На этом этапе QuotaActor может выделять ресурсы другим операндам.

Общая память управляется плазмой_store, которая обычно занимает 50% всей памяти. Возможность переполнения отсутствует, поэтому эта часть памяти выделяется напрямую через соответствующий метод Plasma_store, минуя QuotaActor. Когда общая память израсходована, рабочий процесс Mars пытается сбросить некоторые неиспользуемые фрагменты на диск, чтобы освободить место для новых фрагментов.

Данные в порциях, выгружаемых из разделяемой памяти на диск, могут повторно использоваться последующими операндами, в то время как повторная загрузка данных с диска в разделяемую память может быть очень ресурсоемкой, особенно когда разделяемая память исчерпана и другим частям необходимо быть сброшены на диск для размещения загруженных фрагментов. Поэтому, когда совместное использование данных не требуется (например, чанк используется только одним операндом), мы загружаем чанк непосредственно в частную память процесса, а не в разделяемую память. Это может значительно сократить общее время выполнения задания.

Будущая работа

Марс в настоящее время проходит быструю итерацию. В ближайшем будущем мы рассматриваем возможность реализации аварийного переключения на уровне рабочего процесса и поддержки случайного воспроизведения, а также планируется аварийное переключение на уровне планировщика.

Ссылка: https://www.alibabacloud.com/blog/how-to-execute-mars-in-a-distributed-manner_594633?spm=a2c41.12715028.0.0