Невозможно определить проблему с ошибкой задачи GCP Composer (Airflow) DAG

Я новичок в использовании Apache Airflow. Некоторые операторы моего дага имеют статус сбой. Я пытаюсь понять причину ошибки.

Вот подробности проблемы: Мой даг довольно большой, и некоторые его части состоят из суб-дагов. Что я заметил в пользовательском интерфейсе Composer, так это то, что все субдаги, которые потерпели неудачу, были выполнены в task_id с именем download_file, который использует XCom с GoogleCloudStorageDownloadOperator.


>> GoogleCloudStorageDownloadOperator(
    task_id='download_file',
    bucket="sftp_sef",
    object="{{task_instance.xcom_pull(task_ids='find_file') | first }}",
    filename="/home/airflow/gcs/data/zips/{{{{ds_nodash}}}}_{0}.zip".format(table)
)

Логи в указанном Subdag не показывают ничего полезного.

БРЕВНО :

[2020-04-07 15: 19: 25,618] {models.py:1359} ИНФОРМАЦИЯ - Все зависимости выполнены для [2020-04-07 15: 19: 25,660] {models.py:1359} ИНФОРМАЦИЯ - Все зависимости выполнены для [2020-04-07 15: 19: 25,660] {models.py:1577} ИНФОРМАЦИЯ -

-------------------------------------------------- ----------------------------- Стартовая попытка 10 из 1

[2020-04-07 15: 19: 25,685] {models.py:1599} ИНФОРМАЦИЯ - Выполнение 2020-04-06T11: 44: 31 + 00: 00 [2020-04-07 15: 19: 25,685] {base_task_runner .py: 118} ИНФОРМАЦИЯ - Запуск: ['bash', '-c', 'airflow run datamart_integration.consentement_email download_file 2020-04-06T11: 44: 31 + 00: 00 --job_id 156313 --pool integration --raw -sd DAGS_FOLDER / datamart / datamart_integration.py --cfg_path / tmp / tmpacazgnve ']

Я не уверен, есть ли что-то, что я не проверяю ... Вот мои вопросы:

  1. Как отлаживать ошибки в моих группах DAG Composer в целом
  2. Это хорошая идея - создать локальную среду воздушного потока для локального запуска и отладки моих дагов?
  3. Как проверить наличие ошибок в XCom?



Ответы (1)


Относительно ваших трех вопросов:

Во-первых, при использовании Cloud Composer у вас есть несколько способов отладки ошибок в вашем коде. Согласно документации, вам следует:

  1. Проверьте журналы воздушного потока.

Эти журналы относятся к отдельным задачам DAG. Их можно просмотреть в папке журналов облачного хранилища и в интерфейсе Web Airflow.

Когда вы создаете среду Cloud Composer, также создается Cloud Storage Bucket и связывается с ней. Таким образом, Cloud Composer хранит журналы для отдельных задач DAG в папке журналов внутри этого сегмента, каждая папка рабочего процесса имеет папку для своих DAG и вложенных DAG. Вы можете проверить его структуру здесь.

Что касается веб-интерфейса Airflow, он обновляется каждые 60 секунд. Кроме того, вы можете узнать об этом подробнее здесь.

  1. Ознакомьтесь с операционным пакетом Google Cloud.

Вы можете использовать облачный мониторинг и Cloud Logging с помощью Cloud Composer. В то время как Cloud Monitoring обеспечивает видимость производительности и общего состояния облачных приложений, Cloud Logging показывает журналы, которые создают планировщик и рабочие контейнеры. Таким образом, вы можете использовать оба или только тот, который считаете более полезным, в зависимости от ваших потребностей.

  1. В Cloud Console проверьте наличие ошибок на страницах компонентов Google Cloud, на которых запущена ваша среда.

  2. В веб-интерфейсе Airflow проверьте представление графика группы DAG. для экземпляров невыполненных задач.

Таким образом, эти шаги рекомендуются при устранении неполадок вашей DAG.

Во-вторых, что касается тестирования и отладки, рекомендуется разделить производственную и тестовую среду, чтобы избежать вмешательства DAG.

Кроме того, вы можете протестировать свой DAG локально, в документации по этой теме есть руководство, здесь. Локальное тестирование позволяет выявлять синтаксические ошибки и ошибки в задачах. Однако я должен указать, что невозможно будет проверить / оценить зависимости и связь с базой данных.

В-третьих, как правило, чтобы проверить ошибки в Xcom, вы должны проверить:

  • Если есть код / ​​номер ошибки;
  • Проверьте правильность синтаксиса по образцу кода из документации;
  • Проверьте, не устарели ли пакеты;

Я хотел бы отметить, что согласно этой документации путь Параметр GoogleCloudStorageDownloadOperator был обновлен до GCSToLocalOperator.

Кроме того, я также рекомендую вам взглянуть на это: код и документацию, чтобы проверить синтаксис и ошибки Xcom.

Не стесняйтесь поделиться со мной кодом ошибки, если вам понадобится дополнительная помощь.

person Alexandre Moraes    schedule 08.04.2020
comment
Спасибо ! Я буду держать вас в курсе в следующие дни - person Imad; 09.04.2020
comment
Хорошо, не волнуйтесь. Если вы найдете информацию полезной, примите ответ. - person Alexandre Moraes; 09.04.2020
comment
Хотя документация, кажется, указывает на устаревание GoogleCloudStorageDownloadOperator, текущая версия Airflow в Composer - это 1.10.10, я полагаю? - person Imad; 10.04.2020
comment
@Aetos, согласно документации самый последний воздушный поток версия, которую поддерживает Cloud Composer, - 1.10.6. - person Alexandre Moraes; 10.04.2020
comment
Хотя я не могу найти модуль providers при попытке импортировать его из _2 _... Возможно, я где-то ошибаюсь? - person Imad; 10.04.2020
comment
Когда я просматриваю их репозиторий GitHub, он там ... быть несоответствие между документацией и тем, что присутствует - person Imad; 10.04.2020
comment
И pip install apache-airflow[gcp] возвращает Successfully installed apache-airflow-1.10.10 - person Imad; 10.04.2020
comment
@Aetos, я выполнил команду gcloud beta composer environments list-upgrades ENVIRONMENT_NAME \ --location LOCATION , чтобы проверить доступные обновления, и результат был IMAGE VERSION composer-1.10.0-airflow-1.10.6. Кроме того, если вы выполнили команду gcloud beta composer environments update ENVIRONMENT_NAME \ --location LOCATION --airflow-version=VERSION и заменили VERSION на 1.10.10, вы получите сообщение об ошибке Invalid image version: - person Alexandre Moraes; 10.04.2020
comment
Кроме того, вы можете запустить gcloud composer environments describe example-environment --location europe-west1 и проверить свою версию воздушного потока. Что касается обновления, это просто нормализация пути, здесь . - person Alexandre Moraes; 10.04.2020