Автоматизация вашего первого рабочего процесса с помощью Python

В этом руководстве обсуждаются основные шаги, необходимые для создания вашего первого рабочего процесса с использованием Apache Airflow.

Прежде чем начать, вам необходимо настроить среду Airflow, чтобы иметь возможность выполнять каждый из шагов, описанных в этой статье. Если вы еще этого не сделали, мы считаем эту статью одной из наших любимых.

Почему Airflow?

Вы можете спросить, зачем вообще использовать Airflow? Airflow помогает решить множество проблем за счет автоматизации рабочих процессов и управления скучными и избыточными ручными задачами.

По определению, Apache Airflow - это платформа для программного создания, планирования и мониторинга рабочих процессов, также известных как DAG (см. Github).

Вы можете использовать Airflow для написания ETL, конвейеров машинного обучения и общего планирования заданий (например, Cron), включая резервное копирование базы данных и планирование развертывания кода / конфигурации.

Мы обсудили некоторые преимущества использования Airflow в нашем сравнении Airflow и Luigi.

Что такое трубопроводы воздушного потока

Конвейер Airflow - это, по сути, набор параметров, написанных на Python, которые определяют объект Airflow Directed Acyclic Graph (DAG). Различные задачи в рамках рабочего процесса образуют график, который является направленным, потому что задачи упорядочены. Чтобы не зацикливаться на бесконечном цикле, на этом графике нет циклов, поэтому он ациклический.

Например, если у нас есть три задачи с именами Foo, Bar и FooBar, может случиться так, что Foo запускается первым, а Bar и FooBar зависят от Foo завершения.

Это создаст базовый график, подобный приведенному ниже. Как видите, путь ясен. А теперь представьте себе это с десятками сотен задач. Важно иметь четкую структуру того, как эти задачи выполняются и в каком порядке.

После этого основного объяснения давайте создадим вашу первую группу DAG.

Если вы перешли по ссылке выше для настройки Airflow, то вы должны были создать каталог, который указывает переменную AIRFLOW_HOME на папку. По умолчанию это должна быть папка с именем airflow. В этой папке вам нужно будет создать папку DAG. Вы хотите создать свой первый DAG в папке DAG, как показано ниже.

Установить default_args

Разбив это, нам нужно будет создать словарь Python, содержащий все аргументы, применяемые ко всем задачам в вашем рабочем процессе. Если вы посмотрите на приведенный ниже код, то увидите несколько основных аргументов, включая owner (в основном просто имя владельца DAG) и start_date задачи (определяет день выполнения первого момента задачи DAG)

Airflow создан для обработки как инкрементальных, так и исторических прогонов. Иногда вы просто не хотите планировать рабочий процесс и просто запускаете задачу на сегодня. Вы также можете начать запускать задачи с определенного дня в прошлом (например, день назад), что настроено в первом фрагменте кода ниже.

В данном случае start_date - день назад. Ваша первая группа DAG будет обрабатывать вчерашние данные, а затем в любой день после этого.

Вот еще несколько ключевых параметров.

  • end_date в коде определит дату последнего выполнения. Указание даты окончания ограничивает поток воздуха за пределы даты. Если вы не укажете дату окончания, Airflow будет работать вечно.
  • depends_on_past - логическое значение. Если вы установите для него значение true, текущий запущенный тестовый экземпляр будет полагаться на статус предыдущей задачи. Например, предположим, что вы установили для этого аргумента значение true, в данном случае ежедневный рабочий процесс. Если выполнить вчерашнюю задачу не удалось, то двухдневная задача не будет запущена, поскольку это зависит от статуса предыдущей даты.
  • email - это как раз то место, откуда вы получите уведомление по электронной почте. Вы можете настроить личный адрес электронной почты в файле конфигурации.
  • email on failure используется, чтобы определить, хотите ли вы получать уведомление в случае сбоя.
  • email on retry используется, чтобы определить, хотите ли вы получать электронное письмо каждый раз при повторной попытке.
  • retries указывает, сколько раз Airflow будет пытаться повторить неудачную задачу.
  • retry-delay - это время между последовательными повторными попытками.

В этом примере Airflow будет повторять попытку каждые пять минут.

Качественный рабочий процесс должен иметь возможность предупреждать / сообщать о сбоях, и это одна из ключевых вещей, которых мы стремимся достичь на этом этапе. Airflow специально разработан для упрощения кодирования в этой области. Здесь может оказаться полезным рассылка сообщений о сбоях по электронной почте.

Настроить расписание DAG

Этот шаг посвящен созданию экземпляра группы DAG путем присвоения ей имени и передачи аргумента по умолчанию для вашей группы DAG: default_args=default_args.

Затем установите интервал расписания, чтобы указать, как часто DAG должен запускаться и выполняться. В данном случае это всего один раз в день.

Ниже приведен один из способов настройки DAG.

Если вы хотите запускать свое расписание ежедневно, используйте следующие параметры кода: schedule_interval=’@daily’. Или вы можете использовать cron, например: schedule_interval=’0 0 * * *’.

Разложите все задачи

В приведенном ниже примере у нас есть три задачи с использованием PythonOperator, DummyOperator и BashOperator.

Все эти задачи довольно просты. Вы заметите, что каждый из них выполняет свою функцию и требует разных параметров.

DummyOperator - это просто пустой оператор, который можно использовать для создания шага, который на самом деле ничего не делает, кроме обозначения того, что конвейер завершен.

PythonOperator позволяет вызывать функцию Python и даже передавать ей параметры.

BashOperator позволяет вам вызывать команды bash.

Ниже мы просто напишем задачи. Это не сработает, пока вы не сложите все части вместе.

Используя эти базовые задачи, теперь вы можете начать определять зависимости, порядок, в котором задачи должны выполняться.

Определить зависимости

Есть два способа определить зависимости между задачами.

Первый способ - использовать set_downstream и set_upstream. В этом случае вы можете использовать set_upstream, чтобы python_task зависеть от задачи BASH, или сделать то же самое с последующей версией.

При использовании этой базовой настройки, если задача BASH выполнена успешно, запускается задача Python. Точно так же dummy_task зависит от завершения задачи BASH.

Второй способ определения зависимости - использование оператора битового сдвига. Для тех, кто не знаком с оператором битового сдвига, он выглядит как ›› или ‹<.

Например, если вы хотите указать, что задача Python зависит от задачи BASH, вы можете записать ее как bashtask >> python_task.

А что, если у вас есть несколько задач, зависящих от одной?

Затем вы можете поместить их в список. В этом случае задача Python и dummy_task both зависят от задачи BASH и выполняются параллельно после завершения задачи BASH. Вы можете использовать либо метод set_downstream, либо оператор битового сдвига.

bashtask.set_downstream([python_task, dummy_task])

Ваш первый трубопровод воздушного потока

Теперь, когда мы рассмотрели каждую из частей, мы можем собрать все вместе. Ниже представлен ваш первый базовый трубопровод воздушного потока.

Добавление планировщика воздушного потока DAG

Предполагая, что вы уже инициализировали свою базу данных Airflow, вы можете использовать веб-сервер для добавления в свой новый DAG. Используя следующие команды, вы можете добавить в свой конвейер.

> airflow webserver
> airflow scheduler

Конечный результат появится на панели управления Airflow, как показано ниже.

Спасибо за чтение и удачи в создании ваших будущих конвейеров!