Почему не удалось выполнить задачу Airflow PythonOperator, но код возврата равен 0?

У меня есть Airflow DAG, который работает с PythonOperator, мне интересно, почему моя задача не выполняется, но выходит с кодом возврата 0?

Ошибка выполнения с нулевым кодом возврата заставила меня считать задачу успешно выполненной.

Вы можете увидеть ниже журналы вакансий или прикрепленное изображение, может ли кто-нибудь объяснить, почему это произошло, и посоветовать, как этого избежать?

Журналы экземпляров задачи:

[2019-11-15 22: 45: 23,633] {base_task_runner.py:115} ИНФОРМАЦИЯ - Задание 736: Подзадача http_request_send_push 2019-11-15 22: 45: 23,632 - 10688 - ОШИБКА - 74 - http_request_send_push: http_request_send_push service trigger-Resend -push error ::

[2019-11-15 22: 45: 23,633] {logging_mixin.py:112} ИНФОРМАЦИЯ -

[2019-11-15 22: 45: 23,632] {notification.py:74} ОШИБКА - http_request_send_push: http_request_send_push ошибка службы триггера-повторной отправки-push ::

[2019-11-15 22: 45: 23,633] {python_operator.py:114} ИНФОРМАЦИЯ - Готово. Возвращенное значение было: Нет

[2019-11-15 22: 45: 25,251] {logging_mixin.py:112} ИНФОРМАЦИЯ -

[2019-11-15 22: 45: 25,250] {local_task_job.py:103} ИНФОРМАЦИЯ - Задача завершена с кодом возврата 0

Снимок экрана журнала экземпляра задачи:

введите здесь описание изображения

Снимок экрана с представлением в виде дерева DAG:

введите здесь описание изображения


person shawn    schedule 15.11.2019    source источник
comment
Это зависит от того, что вы запускаете с помощью PythonOperator. Вы можете вставить это, пожалуйста?   -  person kaxil    schedule 15.11.2019
comment
Предположим, что журнал и скриншот, которые я загрузил, уже ясно описывают проблему, я вставлю скрипты, и любые предложения будут признательны. Спасибо.   -  person shawn    schedule 16.11.2019


Ответы (2)


Проще говоря, PythonOperator - это просто оператор, выполняющий функцию Python. Если есть какие-либо ошибки, и вы хотите, чтобы задача находилась в failed состоянии, вам нужно поднять исключение внутри вызываемой функции python. В приведенном ниже примере кода см. fourth_task.

Альтернативой этому является использование ShortCircuitOperator. Ниже приводится описание из справки по API Apache Airflow. руководство:

Он оценивает условие и замыкает рабочий процесс, если условие имеет значение False. Все последующие задачи помечаются как «пропущенные». Если условие истинно, последующие задачи выполняются как обычно.

См. Приведенный ниже пример кода, который объясняет разницу между PythonOperator и ShortCircuitOperator. Также показано, как вызвать исключение и изменить состояние задачи на failed.

def first_task(**kwargs):
    logging.info("first_task")


def second_task(**kwargs):
    logging.info("second_task")
    return True


def third_task(**kwargs):
    logging.info("third_task")
    return False


def fourth_task(**kwargs):
    logging.info("fourth_task")
    raise Exception()


def fifth_task(**kwargs):
    logging.info("fifth_task")
    return True


def sixth_task(**kwargs):
    logging.info("sixth_task")
    return False

first_task = PythonOperator(
    task_id='first_task',
    provide_context=True,
    python_callable=first_task,
    dag=dag)
first_task_successor = DummyOperator(task_id='first_task_successor', dag=dag)
first_task_successor.set_upstream(first_task)


second_task = PythonOperator(
    task_id='second_task',
    provide_context=True,
    python_callable=second_task,
    dag=dag)
second_task_successor = DummyOperator(task_id='second_task_successor', dag=dag)
second_task_successor.set_upstream(second_task)


third_task = PythonOperator(
    task_id='third_task',
    provide_context=True,
    python_callable=third_task,
    dag=dag)
third_task_successor = DummyOperator(task_id='third_task_successor', dag=dag)
third_task_successor.set_upstream(third_task)


fourth_task = PythonOperator(
    task_id='fourth_task',
    provide_context=True,
    python_callable=fourth_task,
    dag=dag)
fourth_task_successor = DummyOperator(task_id='fourth_task_successor', dag=dag)
fourth_task_successor.set_upstream(fourth_task)


fifth_task = ShortCircuitOperator(
    task_id='fifth_task',
    provide_context=True,
    python_callable=fifth_task,
    dag=dag)
fifth_task_successor = DummyOperator(task_id='fifth_task_successor', dag=dag)
fifth_task_successor.set_upstream(fifth_task)

sixth_task = ShortCircuitOperator(
    task_id='sixth_task',
    provide_context=True,
    python_callable=sixth_task,
    dag=dag)
sixth_task_successor = DummyOperator(task_id='sixth_task_successor', dag=dag)
sixth_task_successor.set_upstream(sixth_task)

Снимок экрана:  введите описание изображения здесь

person Sai Neelakantam    schedule 15.11.2019
comment
Большое спасибо, я попробовал способ, который вы предлагаете. После добавления метода raise Exception () задача-преемник не запустилась должным образом. - person shawn; 16.11.2019

@kaxil Коды приведены ниже.

#! /usr/bin/env python
# -*- coding: utf-8 -*-

import inspect
import urllib.request
import airflow
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import timedelta

args = {
    'owner': 'airflow',
    'start_date': airflow.utils.dates.days_ago(1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
}

dag = DAG(
    dag_id='airflow_so',
    catchup=False,
    default_args=args,
    dagrun_timeout=timedelta(minutes=5),
    schedule_interval=timedelta(seconds=10)
)

def http_request_send_push(ds, **kwargs):
    endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-push'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

    endpoint='http://10.19.54.110:8080/v1/trigger-scheduled-repush'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

http_request_send_push = PythonOperator(
    task_id='http_request_send_push',
    provide_context=True,
    python_callable=http_request_send_push,
    dag=dag
)


def http_request_send_sms(ds, **kwargs):
    endpoint='http://10.19.54.134:8080/v1/scheduleSendSms'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

    endpoint='http://10.19.54.134:8080/v1/scheduleReSendSms'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

http_request_send_sms = PythonOperator(
    task_id='http_request_send_sms',
    provide_context=True,
    python_callable=http_request_send_sms,
    dag=dag
)


def http_request_send_email(ds, **kwargs):
    endpoint='http://10.19.54.138:8080/v1/scheduleSendEmail'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

    endpoint='http://10.19.54.138:8080/v1/scheduleReSendEmail'
    try:
        response = urllib.request.urlopen(endpoint, timeout=10)
    except Exception as e:
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(e),
                 e)
    else:
        req = response.read()
        print('%s:%s:%s',
                 inspect.stack()[0][3],
                 type(req),
                 req)

http_request_send_email = PythonOperator(
    task_id='http_request_send_email',
    provide_context=True,
    python_callable=http_request_send_email,
    dag=dag
)

http_request_send_push >> http_request_send_sms >> http_request_send_email

if __name__ == "__main__":
    dag.cli()


person shawn    schedule 16.11.2019