Простой подход к управлению зависимостями между вашими рабочими процессами

Если вы когда-либо создавали конвейеры данных для взаимозависимых бизнес-процессов, вы могли заметить, что объединение всей бизнес-логики вашей компании в один рабочий процесс плохо работает и быстро превращается в кошмар обслуживания. Многие системы планирования рабочих процессов позволяют нам управлять зависимостями в рамках единого конвейера данных, но они не поддерживают нас в управлении зависимостями между рабочими процессами.

Естественным способом решения этой проблемы было бы разделение большого конвейера на множество более мелких и согласование зависимостей между ними в некоторых родительско-дочерних отношениях. Однако есть много возможных способов решения этой проблемы, и я хочу поделиться одним простым подходом, который хорошо сработал для меня. Я надеюсь, что это поможет вам управлять зависимостями между вашими конвейерами данных.

Как сообщество Airflow пыталось решить эту проблему

В книге об Apache Airflow [1], созданной двумя инженерами по данным из GoDataDriven, есть глава об управлении зависимостями. Вот как они резюмировали проблему:

«Airflow управляет зависимостями между задачами в рамках одной группы DAG, однако не предоставляет механизма для взаимозависимостей между DAG».

SubDags

Общая проблема организации взаимозависимых конвейеров данных хорошо известна, и одним из способов ее решения было использование абстракции SubDag. Поначалу организация рабочих процессов в SubDags казалась отличной идеей, но быстро привела к множеству проблем:

  • один SubDag интерпретируется как один узел в графе в родительском конвейере данных, даже если этот дочерний DAG может состоять из многих задач
  • это означает, что каждая задача из дочерней группы обеспечения доступности баз данных выполняется последовательно по очереди, что может привести к взаимоблокировкам в процессах планирования.

Вот еще одна цитата [2], которая красиво резюмирует обсуждение полезности SubDags:

Астроном настоятельно рекомендует держаться подальше от SubDags.

ExternalTaskSensor

Другая идея заключалась в использовании датчика в родительском DAG, который регулярно проверяет состояние выполнения дочернего DAG. Это кажется лучшей идеей, чем SubDags. Однако он работает только в том случае, если расписания между родительской группой DAG и дочерней группой DAG выровнены, что является очень сильным предположением, которое в моем случае почти никогда не было верным. Представьте, что произойдет, если вы вручную активируете этот родительский DAG? Он будет работать вечно, пока не истечет время ожидания, потому что дочерний DAG не работает по тому же расписанию.

TriggerDagRunOperator

Другой способ взглянуть на это - запустить дочерний DAG из родительского DAG. Мне эта идея нравится больше, потому что:

  • нет риска возникновения тупиковых ситуаций
  • он также будет запускаться, когда конвейер запускается вне графика, например. через ручной спусковой крючок.

Однако когда я попробовал это в Airflow, я обнаружил, что текущая реализация TriggerDagRunOperator работает по принципу запустил и забыл. Это означает, что родительская группа DAG не дожидается завершения запущенной дочерней группы DAG перед запуском следующей задачи! Это не то, что нам нужно, если мы хотим управлять зависимостями между конвейерами данных.

Чтобы проиллюстрировать это, давайте посмотрим на следующий конвейер:

Если мы используем TriggerDagRunOperator для уровня бизнес-логики в родительской группе DAG, это вызовет запуск DagRun для этой дочерней группы DAG, но затем немедленно запустит задачу витрины данных перед ожиданием. пока не будут завершены все bus_logic_ETL_x задачи в дочерней группе DAG.

Как я решил это в Airflow

Мое решение было:

  • добавить фиктивную задачу finish в конец каждого дочернего DAG
  • реализовать WaitForCompletion датчик, который проверяет в БД метаданных Airflow состояние последнего DagRun дочернего DAG. Мы идентифицируем этот последний DagRun дочернего DAG следующим образом: он должен начинаться с "trig__", потому что каждый DagRun, запускаемый внешним DAG, называется таким образом. Затем мы сортируем эти DagRun по дате выполнения в порядке убывания и используем LIMIT 1 для получения последнего → это именно тот DagRun, который мы хотим найти.
  • внутри этого датчика мы ищем состояние задачи finish в этом DagRun. Если это success, то датчик завершает задачу и может перейти к задаче витрина данных в родительском конвейере. Но если это не успех, это означает, что задача либо все еще выполняется, либо завершилась неудачно. В обоих случаях это означает, что мы не можем перейти к задаче витрина данных.

Это реализация датчика:

И вот как я использовал этот датчик в родительском DAG:

Как префект подошел к этой проблеме

Prefect включает в себя множество полезных абстракций, которые работают "из коробки". Один из них - FlowRunTask, который включает параметр wait=True, который будет иметь тот же эффект, что и мой WaitForCompletion в Airflow, без необходимости использования какого-либо датчика, выполняющего периодический поиск в базе данных. Это будет:

  • запускать дочерний поток
  • дождитесь завершения дочернего потока, прежде чем переходить к следующей задаче.

Основной поток (т.е. родительский поток)

Вся реализация родительского потока очень проста:

Если мы визуализируем этот поток, запустив flow.visualize, мы увидим, что дочерние потоки даже визуализируются по-разному, чтобы легко отличить обычную задачу от потока, запускаемого FlowRunTask (или каким-либо другим типом задачи, например, например, сопоставленными задачами ).

Дочерние потоки

Я также создал несколько очень простых дочерних потоков, чтобы мы могли запустить полный пример и увидеть результат в пользовательском интерфейсе. В приведенных ниже сценариях вы можете увидеть три дочерних потока, каждый из которых имеет три задачи:

  • staging_area поток
  • business_logic_layer поток
  • data_mart поток

Теперь давайте запустим полный пример, зарегистрировав родительский поток (MasterFlow) и три дочерних потока, и убедимся, что зависимости между потоками работают должным образом.

Мы можем подтвердить, что зависимости были соблюдены и витрина данных начала работать только после того, как потоки промежуточной области и уровня бизнес-логики были завершены.

Если что-то не получится, мы сможем легко перейти к соответствующему дочернему потоку для проверки журналов:

Как запланировать взаимозависимые конвейеры данных?

Чтобы правильно структурировать и запланировать эти родительско-дочерние зависимости, мы можем использовать «главные» конвейеры данных (его также можно было бы назвать «родительским конвейером данных» или «координационным конвейером данных»), как показано на в приведенном выше примере и запланировать только эти родительские конвейеры. Поскольку дочерние потоки запускаются непосредственно из родительского потока, их не нужно планировать.

Покажем это на конкретном примере. Предположим, что в настоящее время эти дочерние потоки имеют следующие среды выполнения:

  • staging_area: 2,5 часа.
  • business_logic: 1,5 часа
  • data_mart: 2 часа.

Это означает, что если мы запланируем запуск MasterFlow на 2 часа ночи, он запустится с запуска staging_area в 2 часа ночи, а затем business_logic начнется в 4:30. и data_mart запустится в 6:00, а весь ETL будет завершен к 8:00.

Чтобы реализовать это, мы просто добавляем расписание в строках 6 и 9 в MasterFlow следующим образом:

Примечание. это расписание работает по всемирному координированному времени. Чтобы использовать расписание для конкретного часового пояса, используйте:

Каковы преимущества этого подхода MasterFlow?

Допустим, вы хотите изменить уровень бизнес-логики и добавить к нему еще одну задачу: используя этот подход, вы можете вносить изменения без какого-либо воздействия на промежуточную область и рабочие процессы витрины данных. Вам не нужно ничего менять в расписании, и вы не попадете в тупик.

Какая реализация проще с точки зрения пользователя?

В целом, мне показалось, что реализация Prefect проще, поскольку мне не нужно было писать какую-либо настраиваемую логику датчика, чтобы должным образом отражать зависимости между конвейерами данных, и она работает из коробки. Это показывает большую дальновидность в дизайне этих абстракций.

Сообщество Prefect также обсуждало SubFlow в этом выпуске Github [3] - следите за ним, если вам интересно.

Заключение

В этой статье мы рассмотрели как управлять зависимостями между конвейерами данных. Мы продемонстрировали, как Airflow исторически подходил к этой проблеме.

Затем я представил простое решение этой проблемы, основанное на принципе: создать конвейер основных данных, который планирует и запускает дочерние рабочие процессы, обеспечивая при этом, чтобы основные данные конвейер ожидает, пока дочерний рабочий процесс не завершится, прежде чем запускать следующий.

Парадигма «триггер + ожидание» кажется намного безопаснее, чем согласование взаимозависимых конвейеров с их расписаниями, и позволяет нам управлять зависимостями между рабочими процессами простым способом.

Спасибо за чтение! Если эта статья была полезной, не стесняйтесь подписываться на меня для следующих статей.

Ссылки:

[1] «Конвейеры данных с Apache Airflow» - Бас П. Харенслак и Джулиан Рутгер де Руйтер

[2] Блог астронома: https://www.astronomer.io/guides/subdags/

[3] Сообщество префектов о подпотоках: https://github.com/PrefectHQ/prefect/issues/1745