Воздушный поток, XCom и несколько идентификаторов задач

Как работают task_ids, когда указано несколько задач?

В этом конкретном примере кода я ожидал получить load_cycle_id_2 из обеих задач в кортеже (5555,22222), но вместо этого он получил (None, 22222).

Это почему?

from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

args = {
    'owner': 'airflow',
    'start_date': datetime.now(),
    'provide_context': True
}

demo_dag = DAG(dag_id='first', start_date=datetime.now(), schedule_interval='@once',default_args=args)

def push_load_id(**kwargs):
    kwargs['ti'].xcom_push(key='load_cycle_id_2',value=22222)
    kwargs['ti'].xcom_push(key='load_cycle_id_3',value=44444)

def another_push_load_id(**kwargs):
    kwargs['ti'].xcom_push(key='load_cycle_id_2',value=5555)
    kwargs['ti'].xcom_push(key='anotherload_cycle_id_3',value=6666)

def pull_load_id(**kwargs):
    ti = kwargs['ti'].xcom_pull(key='load_cycle_id_2', task_ids=['another_push_load_id','push_load_id'])
    print(ti)

push_operator = PythonOperator(task_id='push_load_id', python_callable=push_load_id, dag=demo_dag)
pull_operator = PythonOperator(task_id='pull_load_id', python_callable=pull_load_id, dag=demo_dag)

push_operator >> pull_operator

person dirtyw0lf    schedule 22.12.2018    source источник
comment
Вам не хватает задачи, в которой используется another_push_load_id : push_operator_1 = PythonOperator(task_id='push_load_id1', python_callable=another_push_load_id, dag=demo_dag)   -  person kaxil    schedule 24.12.2018
comment
...конечно! я думаю, смотрю на это слишком долго!   -  person dirtyw0lf    schedule 26.12.2018


Ответы (1)


В ваших дагах работают только push_load_id иpull_load_id функции. Вы не создаете оператор, использующий функцию another_push_load_id.

Конец вашего кода должен выглядеть так:

push_operator = PythonOperator(task_id='push_load_id', python_callable=push_load_id, dag=demo_dag)
another_push_operator = PythonOperator(task_id='push_load_id', python_callable= another_push_load_id, dag=demo_dag)
pull_operator = PythonOperator(task_id='pull_load_id', python_callable=pull_load_id, dag=demo_dag)

push_operator >> another_push_operator >> pull_operator
person mik-laj    schedule 26.12.2018