Пропуск задачи Aiflow в правиле триггера ONE_SUCCESS

Я использую правило триггера one_success, так что если какая-либо из родительских задач проходит, а дочерняя задача выполняется, как и ожидалось. Однако у меня возникает проблема, когда оба не работают. В этом случае дочерняя задача будет пропущена вместо ошибки. Ниже представлена ​​реализация dag

import logging
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.hive_operator import HiveOperator
from airflow.operators.bash_operator import BashOperator
from airflow.sensors.web_hdfs_sensor import WebHdfsSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule


circles = ['101','102','103']

def load_hive_partition(circle, **kwargs):

        return HiveOperator(
                    task_id='add_partition_{}'.format(circle),
                    hql='alter table abc.def add partition (event_date="{{ ds }}", circle="'+circle+'") location "/user/cloudera/hive/abc/def/event_date={{ ds }}/circle='+circle+'"',
                    trigger_rule=TriggerRule.ONE_SUCCESS,
                    dag=dag)

def check_hdfs_node1(circle, **kwargs):
    return WebHdfsSensor(
    task_id='source_data_sensor_node1_{}'.format(circle),
    webhdfs_conn_id='webhdfs_default_1',
    filepath='/user/cloudera/hive/abc/def/event_date={{ds}}/circle='+circle,
    timeout=60 * 60 * 24,
    dag=dag
)

def check_hdfs_node2(circle, **kwargs):
    return WebHdfsSensor(
    task_id='source_data_sensor_node2_{}'.format(circle),
    webhdfs_conn_id='webhdfs_default',
    filepath='/user/cloudera/hive/abc/def/event_date={{ds}}/circle='+circle,
    timeout=60 * 60 * 24,
    dag=dag
)

args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'run_as_user': 'airflow',
    'retries': 3,
    'start_date': datetime(year=2020, month=8, day=30),
    'retry_delay': timedelta(minutes=10)
}

dag = DAG(dag_id="HiveLoadPartition_circle",
          default_args=args,
          schedule_interval='30 18 * * *',
          catchup=False)

kinit_bash = BashOperator(
        task_id='kinit_bash',
        bash_command='kinit -kt /usr/local/airflow/keytab.keytab [email protected]',
        dag=dag)



#start_dag = DummyOperator(task_id='start_dag', dag=dag)
end_dag = DummyOperator(task_id='end_dag',trigger_rule=TriggerRule.ALL_DONE, dag=dag)


for circle in circles:
    add_partition = load_hive_partition(circle)
    check_hdfs_1 = check_hdfs_node1(circle)
    check_hdfs_2 = check_hdfs_node2(circle)
    check_hdfs_1.set_upstream(kinit_bash)
    check_hdfs_2.set_upstream(kinit_bash)
    add_partition.set_upstream(check_hdfs_1)
    add_partition.set_upstream(check_hdfs_2)
    end_dag.set_upstream(add_partition)


Просмотр графика

введите описание изображения здесь

Как я могу сделать так, чтобы моя задача hiveload завершилась ошибкой в ​​случае сбоя обоих hdfs_sensor?

ОБНОВЛЕНИЕ:

Я также пробовал использовать правило триггера all_done в конце dag. Даже тогда он запускает end_dag, когда родительская задача пропускается.

введите описание изображения здесь


person Ayush Goyal    schedule 30.10.2020    source источник
comment
Я не думаю, что это возможно - зачем вы пытаетесь этого добиться? Вы хотите отправить сообщение об ошибке или что-то в этом роде?   -  person Philipp Johannis    schedule 30.10.2020
comment
Ваш даг не подключен. Также много пропущенного кода / опечаток - определение add_partition, check_hdfs_1. Исправьте это и обновите вопрос.   -  person Suhas NM    schedule 02.11.2020
comment
@PhilippJohannis, да, если обе ветки потерпели неудачу, я должен отказаться от всего дага и отправить электронное письмо.   -  person Ayush Goyal    schedule 02.11.2020
comment
@SuhasNM, Обновил правильный даг с просмотром графика   -  person Ayush Goyal    schedule 02.11.2020
comment
Я попробовал ваш код, и он работает, как ожидалось. Статус end_task - «Ошибка восходящего потока».   -  person Suhas NM    schedule 03.11.2020


Ответы (1)


Я столкнулся с той же проблемой и считаю, что это основная проблема с воздушным потоком. Я открыл следующий PR, чтобы исправить это https://github.com/apache/airflow/pull/15467

2021-05-04: исправление объединено и помечено для следующего (2.0.3) релиз.

2021-05-23: исправление было развернуто с выпуском 2.1 (версия 2.0.3 была отменена)

Примечание: я повторно добавил свой удаленный ответ в мета-беседу, так как у меня нет 50 необходимых репутации, чтобы прокомментировать мета-поток. Взглянув на правила редактирования, кажется, что он изначально не подходил для удаления, но если я что-то пропустил, и следующее Мне здесь не место, пожалуйста, переместите этот ответ в мета-поток от моего имени, так как я не могу.

Что касается мета-разговора: этот ответ был предоставлен во многом в духе, отраженном @Ivar's комментарий. Поэтому я намерен, чтобы этот ответ помог Аюшу и всем, кто сталкивается с этой проблемой при использовании ‹= 2.0.2. Однако, учитывая сильную отрицательную (-4 голоса на момент написания) реакцию на этот ответ в следующий раз, я просто воздержусь от участия в SO, так как у меня так много времени в день, и было ясно, что определение проблема, ее устранение и открытие PR не соответствуют стандарту SO. ~ Ура

person r-richmond    schedule 29.04.2021
comment
Этот ответ обсуждается на мета - person Zoe; 29.04.2021
comment
Я думаю, вы можете улучшить этот ответ, объяснив, что было не так и как это было исправлено. - person Cerbrus; 29.04.2021