Я новичок в использовании Apache Airflow. Некоторые операторы моего дага имеют статус сбой. Я пытаюсь понять причину ошибки.
Вот подробности проблемы: Мой даг довольно большой, и некоторые его части состоят из суб-дагов. Что я заметил в пользовательском интерфейсе Composer, так это то, что все субдаги, которые потерпели неудачу, были выполнены в task_id
с именем download_file
, который использует XCom
с GoogleCloudStorageDownloadOperator
.
>> GoogleCloudStorageDownloadOperator(
task_id='download_file',
bucket="sftp_sef",
object="{{task_instance.xcom_pull(task_ids='find_file') | first }}",
filename="/home/airflow/gcs/data/zips/{{{{ds_nodash}}}}_{0}.zip".format(table)
)
Логи в указанном Subdag не показывают ничего полезного.
БРЕВНО :
[2020-04-07 15: 19: 25,618] {models.py:1359} ИНФОРМАЦИЯ - Все зависимости выполнены для [2020-04-07 15: 19: 25,660] {models.py:1359} ИНФОРМАЦИЯ - Все зависимости выполнены для [2020-04-07 15: 19: 25,660] {models.py:1577} ИНФОРМАЦИЯ -
-------------------------------------------------- ----------------------------- Стартовая попытка 10 из 1
[2020-04-07 15: 19: 25,685] {models.py:1599} ИНФОРМАЦИЯ - Выполнение 2020-04-06T11: 44: 31 + 00: 00 [2020-04-07 15: 19: 25,685] {base_task_runner .py: 118} ИНФОРМАЦИЯ - Запуск: ['bash', '-c', 'airflow run datamart_integration.consentement_email download_file 2020-04-06T11: 44: 31 + 00: 00 --job_id 156313 --pool integration --raw -sd DAGS_FOLDER / datamart / datamart_integration.py --cfg_path / tmp / tmpacazgnve ']
Я не уверен, есть ли что-то, что я не проверяю ... Вот мои вопросы:
- Как отлаживать ошибки в моих группах DAG Composer в целом
- Это хорошая идея - создать локальную среду воздушного потока для локального запуска и отладки моих дагов?
- Как проверить наличие ошибок в XCom?