Здравствуйте, все, что я работаю с воздушным потоком, вот сценарий, который я пытаюсь решить, я хочу динамически создавать DAG после запуска функции
try:
import os
import sys
from datetime import timedelta,datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.utils.task_group import TaskGroup
import pandas as pd
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
# ===============================================
default_args = {
"owner": "airflow",
"start_date": datetime(2021, 1, 1),
"retries": 1,
"retry_delay": timedelta(minutes=1),
'email': ['[email protected]'],
'email_on_failure': True,
'email_on_retry': False,
}
dag = DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False)
# ================================================
class XcomHelper(object):
def __init__(self, **context):
self.context = context
def get(self, key=None):
""" Get the Value from XCOM"""
try:
return self.context.get("ti").xcom_pull(key=key)
except Exception as e: return "Error"
def push(self, key=None, value=None):
"""Push the value on session """
try:
self.context['ti'].xcom_push(key=key, value=value)
return True
except Exception as e: return False
def create_dag(dag_id,schedule,dag_number,default_args):
def hello_world_py():
print('Hello World')
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
t1 = PythonOperator(task_id=dag_id,python_callable=hello_world_py)
return dag
def simple_task(**context):
DATA = ["soumil", "Shah"]
for n in range(1, len(DATA)):
try:
dag_id = 'hello_world_{}'.format(str(n))
print("DAG ID : {} ".format(dag_id))
default_args = {'owner': 'airflow','start_date': datetime(2018, 1, 1)}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id,schedule, dag_number,default_args)
except Exception as e:
print("Error : {} ".format(e))
with DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False) as dag:
simple_task = PythonOperator(task_id="simple_task",
python_callable=simple_task,
provide_context=True)
simple_task
Я хочу создать эти даги на основе len переменной DATA, что данные поступают из базы данных
я пытался заглянуть в
- https://www.astronomer.io/guides/dynamically-generating-dags
- Может ли задача Airflow динамически создавать DAG во время выполнения? < / а>
- https://medium.com/@flavio.mtps/making-use-of-python-globals-to-dynamically-create-airflow-dags-124e556b704e
любая помощь была бы замечательной
Пересмотренный код:
try:
import os
import sys
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# from airflow.operators.email_operator import EmailOperator
# from airflow.utils.trigger_rule import TriggerRule
# from airflow.utils.task_group import TaskGroup
# import pandas as pd
print("All Dag modules are ok ......")
except Exception as e:
print("Error {} ".format(e))
def create_dag(dag_id, schedule, dag_number, default_args):
def hello_world_py():
print('Hello World')
dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)
with dag:
t1 = PythonOperator(task_id=dag_id, python_callable=hello_world_py)
return dag
def simple_task():
DATA = ["soumil", "Shah", "Shah2"]
for n in range(0, len(DATA)):
try:
dag_id = 'hello_world_{}'.format(str(n))
print("DAG ID : {} ".format(dag_id))
default_args = {'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
except Exception as e:
print("Error : {} ".format(e))
def trigger_function():
print("HEREE")
simple_task()
with DAG(dag_id="project", schedule_interval="@once", default_args={'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}, catchup=False) as dag:
trigger_function = PythonOperator(task_id="trigger_function",python_callable=trigger_function,provide_context=True,)
trigger_function