Я использую правило триггера one_success, так что если какая-либо из родительских задач проходит, а дочерняя задача выполняется, как и ожидалось. Однако у меня возникает проблема, когда оба не работают. В этом случае дочерняя задача будет пропущена вместо ошибки. Ниже представлена реализация dag
import logging
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.hive_operator import HiveOperator
from airflow.operators.bash_operator import BashOperator
from airflow.sensors.web_hdfs_sensor import WebHdfsSensor
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
circles = ['101','102','103']
def load_hive_partition(circle, **kwargs):
return HiveOperator(
task_id='add_partition_{}'.format(circle),
hql='alter table abc.def add partition (event_date="{{ ds }}", circle="'+circle+'") location "/user/cloudera/hive/abc/def/event_date={{ ds }}/circle='+circle+'"',
trigger_rule=TriggerRule.ONE_SUCCESS,
dag=dag)
def check_hdfs_node1(circle, **kwargs):
return WebHdfsSensor(
task_id='source_data_sensor_node1_{}'.format(circle),
webhdfs_conn_id='webhdfs_default_1',
filepath='/user/cloudera/hive/abc/def/event_date={{ds}}/circle='+circle,
timeout=60 * 60 * 24,
dag=dag
)
def check_hdfs_node2(circle, **kwargs):
return WebHdfsSensor(
task_id='source_data_sensor_node2_{}'.format(circle),
webhdfs_conn_id='webhdfs_default',
filepath='/user/cloudera/hive/abc/def/event_date={{ds}}/circle='+circle,
timeout=60 * 60 * 24,
dag=dag
)
args = {
'owner': 'airflow',
'depends_on_past': False,
'run_as_user': 'airflow',
'retries': 3,
'start_date': datetime(year=2020, month=8, day=30),
'retry_delay': timedelta(minutes=10)
}
dag = DAG(dag_id="HiveLoadPartition_circle",
default_args=args,
schedule_interval='30 18 * * *',
catchup=False)
kinit_bash = BashOperator(
task_id='kinit_bash',
bash_command='kinit -kt /usr/local/airflow/keytab.keytab [email protected]',
dag=dag)
#start_dag = DummyOperator(task_id='start_dag', dag=dag)
end_dag = DummyOperator(task_id='end_dag',trigger_rule=TriggerRule.ALL_DONE, dag=dag)
for circle in circles:
add_partition = load_hive_partition(circle)
check_hdfs_1 = check_hdfs_node1(circle)
check_hdfs_2 = check_hdfs_node2(circle)
check_hdfs_1.set_upstream(kinit_bash)
check_hdfs_2.set_upstream(kinit_bash)
add_partition.set_upstream(check_hdfs_1)
add_partition.set_upstream(check_hdfs_2)
end_dag.set_upstream(add_partition)
Просмотр графика
Как я могу сделать так, чтобы моя задача hiveload завершилась ошибкой в случае сбоя обоих hdfs_sensor?
ОБНОВЛЕНИЕ:
Я также пробовал использовать правило триггера all_done в конце dag. Даже тогда он запускает end_dag, когда родительская задача пропускается.