Apache Airflow - это инструмент, созданный сообществом для программного создания, планирования и мониторинга рабочих процессов. Самым большим преимуществом Airflow является то, что он не ограничивает объем трубопроводов. Airflow можно использовать для построения моделей машинного обучения, передачи данных или управления инфраструктурой. Давайте подробнее рассмотрим популярный инструмент управления рабочим процессом.

Чистый питон

Apache Airflow - один из немногих проектов Apache, написанных на Python.

Принимая во внимание, что наши пользователи в основном специалисты по обработке данных и инженеры по обработке данных, это дает нам огромное преимущество, поскольку большинство из них знакомы с python - не только с языком, но и со всей экосистемой, которую они используют ежедневно. И это чистая работа Python (змеи, змеи везде)! Airflow не только написан на Python, но и ожидает, что вы будете писать свои рабочие процессы с использованием этого языка! Да, вы поняли - рабочие процессы Airflow написаны на чистом Python!

Больше никаких декларативных XML или YAML. У этого подхода есть много плюсов, в том числе:

  • Динамические рабочие процессы с использованием циклов `for` и ` while` и других программных конструкций на Python
  • Формат даты и времени, используемый для планирования задач
  • Легко добавлять новые интеграции - они тоже написаны на Python!
  • В отличие от ряда «облачных» решений для движков рабочих процессов - здесь нет необходимости знать Docker, образы, контейнеры, реестры. Вы все еще можете это сделать (поскольку Airflow имеет отличную интеграцию докеров и Kubernetes), но вам просто нужно знать Python, чтобы выполнять свою работу.
  • Поскольку Airflow не привязан к контейнеру OCI / Docker - он может легко повторяться с любым контейнерным подходом (например, Singularity - популярным среди исследователей биоинформатики).

Вот пример простого рабочего процесса Airflow:

import time
import datetime
from airflow.models import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.email.operators.email import EmailOperator
from airflow.utils.dates import days_ago
def task_function(random_base):
   """This function will be called during workflow execution"""
   time.sleep(random_base)
with DAG(
   "example_workflow",
   default_args={"start_date": days_ago(1)},
   schedule_interval=datetime.timedelta(minutes=40),
) as dag:
   start_email = EmailOperator(
       task_id="start_email",
       to="[email protected]",
       subject="The pipeline has started",
       html_content="<p>Your pipeline has started</p>"
   )
   end_email = EmailOperator(
       task_id="end_email",
       to="[email protected]",
       subject="The pipeline has finishes",
       html_content="<p>Your pipeline has finished</p>"
   )
   for i in range(5):
       task = PythonOperator(
           task_id='sleep_for_' + str(i),
           python_callable=task_function,
           op_kwargs={'random_base': float(i) / 10},
           dag=dag,
       )
       start_email >> task >> end_email

В этом примере мы видим, что с помощью EmailOperator мы определяем две задачи: `start_email` и ` end_email`. . Затем в цикле `for` мы добавляем следующую задачу, которая выполнит вызываемую Python ` task_function`, которая просто спит.

Интересная часть - это строка с `start_email ›› задача ›› end_email`. Потому что там мы определяем взаимосвязь между каждой задачей цикла и задачами `start_email` и ` end_email`. Таким образом, Airflow сначала запустит задачу `start_email`, а затем выполнит все задачи ` sleep_for_ * `. и, наконец, он отправит другое электронное письмо с помощью задачи `end_email`. Вот как этот рабочий процесс выглядит в Airflow WebUI:

Один инструмент, чтобы управлять ими всеми

Самым важным в Airflow является то, что это «оркестратор». Airflow не обрабатывает данные самостоятельно, Airflow только сообщает другим, что и когда нужно делать. И это самое большое преимущество Airflow. Хотя он разработан для поддержки обработки данных и отлично справляется с этим, рабочие процессы этим не ограничиваются. Рабочие процессы Airflow могут сочетать множество разных задач. Например, рабочий процесс может создать инфраструктуру обработки данных, использовать ее для выполнения рабочей нагрузки данных, генерировать и отправлять отчеты и, наконец, разрушить инфраструктуру для минимизации затрат.

Более того, Airflow уже имеет множество (буквально несколько сотен!) Интеграций, которые справятся со всей работой. Это действительно упрощает процесс создания конвейеров. Все, что вам нужно сделать, это выбрать несколько уже существующих операторов и смешать их в желаемом порядке. И даже если вы пропустите какие-то интеграции - добавить новую очень просто! Все, что вам нужно сделать, это создать простой класс Python.

Вот краткий список интеграций, которые Airflow поддерживает в настоящее время:

  • Команда Bash
  • Вызываемый Python
  • Kubernetes (например, беговые поды)
  • Докер (например, работающие контейнеры)
  • Облачная платформа Google (например, вакансии Dataproc, загрузка GCS)
  • Amazon Web Services (например, запрос Redshift, загрузка S3)
  • Службы Azure
  • В последнее время - контейнеры Singularity (популярны среди людей с биоинформатическим опытом)
  • … Многие, многие, многие другие…

Интервалы данных

Есть одна вещь, которая сбивает с толку, но при этом открывает глаза для многих новых пользователей Airflow. Airflow лучше всего подходит для обработки интервалов данных. Это не решение для потоковой обработки данных (и Airflow на него не претендует). Airflow работает лучше всего, когда есть пакетные фиксированные интервалы обработки данных. Хотя поначалу это кажется ограничивающим, это позволяет вам контролировать качество ваших данных. Представьте, что вы понимаете, что за последние несколько недель ваши ежедневные обрабатываемые данные должны быть повторно обработаны, потому что вы обнаружили и исправили ошибку в конвейере обработки. Или ваши метаданные изменились, и вы можете лучше классифицировать свои данные за последние несколько недель.

Один из замечательных примеров от пользователей Airflow - обработка телекоммуникационных данных, в которых вы сопоставляете (IMEI) [https://pl.wikipedia.org/wiki/International_Mobile_Equipment_Identity] телефона с конкретными моделями телефонов. Сопоставление IMEI с моделями телефонов обычно задерживается на дни или недели, и когда вы получаете новые метаданные, вы хотели бы повторно обработать несколько недель уже обработанных данных с минимальным временем обработки. Airflow с его подходом, ориентированным на интервалы данных, возможностями обратной засыпки и повторной обработкой только необходимой части конвейера данных - идеальное решение для этого!

Если вы хотите узнать больше об Airflow и его внутреннем устройстве, присоединяйтесь к нам на нашем предстоящем выступлении на ODSC East « Введение в Apache Airflow «!

Ярек Потюк, главный инженер-программист и член PMC Apache Airflow, Polidea | Фонд программного обеспечения Apache

В качестве технического директора Джаред построил программное обеспечение в 10 раз: от 6 до 60 человек. Проработав несколько лет техническим директором, он решил вернуться к штатной должности инженера и работает главным инженером-программистом в своей компании (и очень этому рад). Ярек работал инженером во многих отраслях - телекоммуникациях, разработке мобильных приложений, Google, робототехнике и искусственном интеллекте, облачных вычислениях и обработке данных с открытым исходным кодом. В настоящее время Ярек работает менеджером по маркетингу в проекте Apache Airflow.

https://www.linkedin.com/in/jarekpotiuk/
https://github.com/potiuk

Томаш Урбашек, инженер-программист и коммиттер Apache Airflow, Polidea | Фонд программного обеспечения Apache

Томек - инженер-программист в Polidea, коммиттер Apache Airflow и любитель книг. Ему нравится функциональное программирование, потому что он дипломированный математик. Каждый день он пытается сделать мир лучше.

https://www.linkedin.com/in/tomaszurbaszek/
https://github.com/nuclearpinguin