Резюме

В первом посте нашей серии мы немного узнали о Apache Airflow и о том, как он может помочь нам в создании не только Data Engineering и ETL конвейеры, но также и другие типы соответствующих рабочих процессов в рамках расширенной аналитики, такие как рабочие нагрузки MLOps.

Мы кратко рассмотрели некоторые из его строительных блоков, а именно Датчики, Операторы, Перехватчики и Исполнители. Эти компоненты обеспечивают базовую основу для работы с Apache Airflow. Тогда мы работали с SequentialExecutor, самой простой из возможных настроек Airflow. Имея поддержку для одновременного запуска только одной задачи, он используется в основном для простых демонстраций. Этого явно недостаточно для производственных сценариев, где нам может потребоваться параллельное выполнение множества задач и рабочих процессов.

Как мы уже обсуждали, Apache Airflow поставляется с поддержкой нескольких типов исполнителей - каждый из них больше подходит для определенного типа сценария.

  • LocalExecutor открывает больше возможностей, позволяя выполнять несколько параллельных задач и / или групп DAG. Это достигается за счет использования полноценной СУБД для базы данных метаданных заданий, которая может быть построена поверх базы данных PostgreSql или MySQL. Хотя вы определенно можете запускать некоторые легкие рабочие нагрузки с помощью LocalExecutor, для масштабирования вам придется прибегнуть к вертикальному масштабированию, увеличив аппаратные ресурсы вашей среды.
  • CeleryExecutor устраняет это ограничение: он открывает возможность горизонтального масштабирования настройки, позволяя создать обрабатывающий кластер с несколькими компьютерами. Каждая машина раскручивает одного или нескольких Сельдерей, чтобы разделить рабочую нагрузку Airflow. Связь между рабочими процессами и планировщиком осуществляется через интерфейс брокера сообщений (может быть реализован с помощью Redis или RabbitMQ). Сказав это, вы должны заранее определить количество рабочих и настроить его в конфигурации Airflow, что может сделать настройку немного статичной.
  • DaskExecutor работает аналогично CeleryExecutor, но вместо Celery он использует структуру Dask для достижения возможностей распределенных вычислений.
  • Совсем недавно KubernetesExecutor стал доступен в качестве опции для масштабирования настройки Apache Airflow. Он позволяет развернуть Airflow в кластере Kubernetes. Возможность крутить Kubernetes Pods вверх и вниз предоставляется прямо из коробки. Вы захотите использовать такую ​​настройку, если хотите добавить больше масштаба к своей настройке более гибким и динамичным образом. Однако это происходит за счет управления кластером Kubernetes.

MVP для Apache Airflow

При создании группы данных или возможностей оценка затрат по сравнению с выгодой, а также сложности по сравнению с добавленной стоимостью является критическая, трудоемкая, непростая задача. Гибкие организации и стартапы обычно работают с прототипами, чтобы справиться с таким сценарием - иногда рабочие продукты лучше отвечают на вопросы, чем люди.

Вдохновленные этой философией, мы создадим базовую гипотетическую установку для производственной среды Apache Airflow. У нас будет пошаговое руководство по развертыванию такой среды с помощью LocalExecutor, одного из возможных механизмов задач Apache Airflow.

Для производственного прототипа выбор LocalExecutor оправдан по следующим причинам:

  • Предоставляет возможности параллельной обработки
  • Всего один вычислительный узел - меньше затрат на обслуживание и эксплуатацию
  • Брокеры сообщений не нужны

Местный исполнитель

Вы можете спросить - как это возможно? Как видно из названия, когда мы используем LocalExecutor, мы в основном запускаем все компоненты Airflow из одной и той же физической среды. Когда мы смотрим на архитектуру Apache Airflow, мы говорим об этом:

У нас есть несколько процессов ОС, на которых запущены Веб-сервер, Планировщик и Рабочие. Мы можем рассматривать LocalExecutor в абстрактных терминах как слой, который создает интерфейс между планировщиком и рабочими процессами. Его функция в основном состоит в том, чтобы раскручивать Workers для выполнения задач из Airflow DAG, одновременно отслеживая его статус и выполнение.

Получение вращения колес

У нас было концептуальное введение в LocalExecutor. Без лишних слов, давайте настроим нашу среду. Наша работа будет вращаться вокруг следующего:

  1. Установка и настройка Postgresql
  2. Установка Apache Airflow
  3. Конфигурация Apache Airflow
  4. Тестирование
  5. Настройка Airflow для работы в качестве службы

Эти действия были протестированы с Ubuntu 18.04 LTS, но они должны работать с любым дистрибутивом Linux на основе Debian. Здесь мы предполагаем, что у вас уже настроен Python 3.6+. Если это не так, прочтите этот пост.

Примечание. вы также можете использовать управляемый экземпляр PostgreSql, например База данных Azure для PostgreSql или Amazon RDS для PostgreSql для пример. Фактически, это рекомендуется для производственной установки, поскольку это снимет нагрузку на обслуживание и резервное копирование.

1. Установка и настройка Postgresql

Чтобы установить PostgreSql, мы можем просто запустить в командной строке следующее:

sudo apt-get install postgresql postgresql-contrib

Через несколько секунд PostgreSql должен быть установлен.

Далее нам нужно его настроить. Первый шаг - создание объекта psql:

sudo -u postgres psql

Приступаем к настройке необходимого пользователя, базы данных и разрешений:

postgres=# CREATE USER airflow PASSWORD 'airflow'; #you might wanna change this
CREATE ROLE
postgres=# CREATE DATABASE airflow;
CREATE DATABASE
postgres=# GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA public TO airflow;
GRANT

Наконец, нам нужно установить libpq-dev, чтобы мы могли реализовать клиент PostgreSql:

sudo apt install libpq-dev

Необязательный шаг 1: вы можете сделать вашу установку более безопасной, ограничив соединения с вашей базой данных только на локальном компьютере. Для этого вам нужно изменить IP-адреса в файле pg_hba.conf:

sudo vim /etc/postgresql/12/main/pg_hba.conf

Необязательный шаг 2: вы можете настроить PostgreSql на автоматический запуск при каждой загрузке. Сделать это:

sudo update-rc.d postgresql enable

2. Установка Apache Airflow

Мы установим Airflow и его зависимости с помощью pip:

pip install apache-airflow['postgresql']
pip install psycopg2

К настоящему времени у вас должен быть установлен Airflow. По умолчанию Airflow устанавливается в ~ / .local / bin. Не забудьте запустить следующую команду:

export PATH=$PATH:/home/your_user/.local/bin/

Это необходимо для того, чтобы система знала, где разместить двоичный файл Airflow.

Примечание. в этом примере мы не используем virtualenv или Pipenv, но вы можете использовать их. Просто убедитесь, что зависимости среды правильно сопоставлены, когда вы настраиваете Airflow для работы в качестве службы :)

3. Конфигурация Apache Airflow

Теперь нам нужно настроить Airflow для использования LocalExecutor и нашей базы данных PostgreSql.

Перейдите в каталог установки Airflow и отредактируйте airflow.cfg.

vim airflow.cfg

Убедитесь, что для параметра исполнителя задано значение LocalExecutor и соответственно задана строка подключения SqlAlchemy:

Наконец, нам нужно инициализировать нашу базу данных:

airflow initdb

Убедитесь, что сообщения об ошибках не выводятся как часть вывода initdb.

4. Тестирование

Пришло время проверить, правильно ли работает Airflow. Для этого мы запускаем Планировщик и Веб-сервер:

airflow scheduler
airflow webserver

После того, как вы запустите браузер и укажете IP-адрес своего компьютера, вы должны увидеть новую установку Airflow:

5. Настройка Airflow для работы в качестве службы.

Наш последний шаг - настроить демон для служб планировщика и веб-сервера. Это необходимо для того, чтобы обеспечить автоматический перезапуск Airflow в случае сбоя или после перезагрузки нашей машины.

В качестве начального шага нам нужно настроить Gunicorn. Поскольку по умолчанию он не установлен глобально, нам нужно создать для него символическую ссылку.

sudo ln -fs $(which gunicorn) /bin/gunicorn

Затем мы создаем служебные файлы для веб-сервера и планировщика:

sudo touch /etc/systemd/system/airflow-webserver.service
sudo touch /etc/systemd/system/airflow-scheduler.service

Наш airflow-webserver.service должен выглядеть следующим образом:

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# “License”); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
[Unit]
Description=Airflow webserver daemon
After=network.target postgresql.service mysql.service
Wants=postgresql.service mysql.service
[Service]
EnvironmentFile=/etc/environment
User=airflow
Group=airflow
Type=simple
ExecStart= /home/airflow/.local/bin/airflow webserver
Restart=on-failure
RestartSec=5s
PrivateTmp=true
[Install]
WantedBy=multi-user.target

Аналогичным образом мы добавляем в airflow-scheduler.service следующее содержимое:

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# “License”); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
[Unit]
Description=Airflow scheduler daemon
After=network.target postgresql.service mysql.service
Wants=postgresql.service mysql.service
[Service]
EnvironmentFile=/etc/environment
User=airflow
Group=airflow
Type=simple
ExecStart=/home/airflow/.local/bin/airflow scheduler
Restart=always
RestartSec=5s
[Install]
WantedBy=multi-user.target

Примечание: в зависимости от каталога, в который вы установили Airflow, возможно, потребуется изменить переменную ExecStart.

Теперь нам просто нужно перезагрузить наш системный демон, включить и запустить наши службы:

sudo systemctl daemon-reload
sudo systemctl enable airflow-scheduler.service
sudo systemctl start airflow-scheduler.service
sudo systemctl enable airflow-webserver.service
sudo systemctl start airflow-webserver.service

Наши услуги должны были запуститься успешно. Чтобы подтвердить это:

$ sudo systemctl status airflow-webserver.service
$ sudo systemctl status airflow-scheduler.service

Вы должны увидеть вывод о том, что обе службы активны и включены. Например, для веб-сервера вы должны увидеть что-то подобное:

Вот и все. Теперь у вас есть базовая производственная установка для Apache Airflow с использованием LocalExecutor, который позволяет запускать группы DAG, содержащие параллельные задачи, и / или запускать несколько групп DAG одновременно. Это определенно необходимо для любого серьезного случая использования, который я также планирую продемонстрировать в следующем посте.

Конечно, здесь есть много возможных улучшений:

  • Самый очевидный способ - автоматизировать эти шаги, например, создав конвейер CICD с помощью модуля Runbook Ansible.
  • Использование более безопасных учетных данных PostgreSql для Airflow и их более безопасное хранение. Мы могли бы сохранить их как секретную переменную в конвейере CICD и настроить их как переменные среды вместо хранения в airflow.cfg.
  • Ограничение разрешений для пользователей Airflow как в ОС, так и в PostgreSql

Но пока мы оставим эти шаги для будущей статьи.

Я рад, что вы добрались до этого места, и надеюсь, что вы сочли это полезным. Посмотрите другие мои статьи: