Скажем, у меня есть даг, в котором одна задача зависит от 4 задач. Все 4 задачи должны запускаться только в том случае, если та же задача предыдущего запуска dag была успешной. Таким образом, все задачи имеют значение depends_on_past как True. Однако последняя задача дага - это задача очистки, которая должна запускаться всегда. Таким образом, в случае сбоя задачи предыдущего дня задача текущего дня не запускается, поэтому последняя задача также не запускается. Как можно было бы решить эту проблему?
неуспешная задача, основанная на Debian_on_past в операторах воздушного потока
Ответы (1)
Всегда устанавливайте trigger_rule=TriggerRule.ALL_DONE
в своих задачах очистки (те задачи, которые должны всегда запускается, независимо от статуса восходящих задач)
И хотя вы, кажется, уже поняли это, также может иметь смысл установить depends_on_past=False
в ваших задачах очистки.
person
y2k-shubham
schedule
07.08.2020
all_done
будет иметь смысл только в том случае, если была запущена восходящая задача, верно? Итак, если задача была запущена, независимо от результата. если нижележащая задача имеет all_done
, она будет запущена. Однако в случае зависимости_on_past задача не запускается. Так что all_done не помогает.
- person PiyushC; 07.08.2020
Понял. [1] Но если вы скажете, что восходящие задачи вообще не запускались, то каково состояние DAG? (он все еще работает?) [2] очень неаккуратным решением было бы реализовать это настраиваемое поведение самостоятельно (вместо того, чтобы использовать
depends_on_past
): ваша задача пропущена (или не удалась, если хотите) вручную путем повышения AirflowSkipException
или AirflowFailException
в случае, если предыдущий экземпляр задачи не имеет ' t еще не закончено (для этого потребуется запросить мета-базу данных или использовать модель SQLAlchemy TaskInstance
).
- person y2k-shubham; 07.08.2020
[1] да, он остается в рабочем состоянии. [2] Да, я могу это сделать вручную. Есть и другие способы. Но казалось странным, что Airflow имеет эту функцию, но не поддерживает этот сценарий.
- person PiyushC; 07.08.2020
Но если DAG останется в рабочем состоянии, то в конечном итоге задача очистки все равно будет запущена, верно? Я думаю, что то, что вы пытаетесь сделать здесь (запускать задачу очистки, как только некоторые восходящие задачи застревают из-за
depends_on_past
), противоречит принципу зависимостей задач Airflow, поэтому я не вижу простого способа добиться этого.
- person y2k-shubham; 07.08.2020
Я ожидал, поскольку задача предыдущего запуска не удалась, задача текущего запуска перейдет в состояние, которое может быть обработано каким-либо правилом триггера, чтобы можно было продолжить выполнение последующей задачи. Но да, я могу понять, почему наличие чего-то подобного может противоречить предполагаемому поведению зависимого_on_past (когда вы исправляете первую неудачную задачу, а затем все последующие задачи запускаются автоматически). Все еще надеялся, что с этим можно справиться для этого сценария, который я хочу решить.
- person PiyushC; 07.08.2020