Флаг Catchup в dag Airflow не работает должным образом

Шаги (сегодня 7 января 2020 года):

1) Поместите следующий тег в каталог Airflow:

from datetime import datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator

dag = DAG(dag_id='example_dag', start_date=datetime(2020, 1, 1), catchup=False)

t1 = BashOperator(task_id='bash_task', bash_command='echo Hola!', dag=dag)

Обратите внимание на флаг наверстывания, препятствующий расписанию Airflow в даты с истекшим сроком действия.

2) Запустите свежий экземпляр Airflow

3) Включите даг в UI

4) Исполнение:

введите здесь описание изображения

Я действительно не понимаю, почему эти даги с истекшими датами (5 января и 6 января) планируются и выполняются, если я использую флаг наверстывания и развертываю 7 января. Какие-нибудь советы? Спасибо!

Обновление: без флага наверстывания я получил:

введите здесь описание изображения

So:

1) учитывается флаг наверстывания

2) кажется, что в нем есть ошибка или он плохо настроен, потому что, когда он установлен на False, Airflow все еще выполняет планирование на даты с истекшим сроком (5 января и 6 января).


person italktothewind    schedule 07.01.2020    source источник
comment
stackoverflow.com/q/58563313/2956135   -  person Emma    schedule 08.01.2020
comment
Но ведь это же ошибка? Флаг наверстывания не позволяет Airflow заполнять предыдущие вакансии, а вакансии с 5 по 6 января определенно устарели для 7 января.   -  person italktothewind    schedule 08.01.2020
comment
5 января кажется неправильным, но 6 января правильным для 7 января. Триггеры воздушного потока срабатывают после покрытия schedule_interval. и по умолчанию schedule_interval составляет один день. airflow.apache.org/docs/stable/scheduler.html.   -  person Emma    schedule 08.01.2020


Ответы (1)


Как я уже упоминал в https://stackoverflow.com/a/61740904/5691525, планировщик воздушного потока создает запуск DAG для самый последний экземпляр серии интервалов DAG, когда catchup=False, но была ошибка ( при использовании объекта timedelta для schedule_interval), который создал 2 DagRun.

В вашем случае вы не передали поле schedule_interval в DAG, из-за чего он принимает значение по умолчанию. По умолчанию используется timedelta(days=1) (https://github.com/apache/airflow/blob/3ad4f96bae78f16a2240567f65831ca269672d7b/airflow/models/dag.py#L212), поэтому было создано 2 DagRun.

Это будет исправлено в Airflow 1.10.11 и исправлено для мастера с помощью this PR.

person kaxil    schedule 11.05.2020