Airflow2 Динамическое создание Dag после выполнения функции

Здравствуйте, все, что я работаю с воздушным потоком, вот сценарий, который я пытаюсь решить, я хочу динамически создавать DAG после запуска функции

try:
    import os
    import sys

    from datetime import timedelta,datetime
    from airflow import DAG

    from airflow.operators.python_operator import PythonOperator
    from airflow.operators.email_operator import EmailOperator
    from airflow.utils.trigger_rule import TriggerRule
    from airflow.utils.task_group import TaskGroup
    import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


# ===============================================
default_args = {
    "owner": "airflow",
    "start_date": datetime(2021, 1, 1),
    "retries": 1,
    "retry_delay": timedelta(minutes=1),
    'email': ['[email protected]'],
    'email_on_failure': True,
    'email_on_retry': False,
}
dag = DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False)
# ================================================


class XcomHelper(object):

    def __init__(self, **context):
        self.context = context

    def get(self, key=None):
        """ Get the Value from XCOM"""
        try:
            return self.context.get("ti").xcom_pull(key=key)
        except Exception as e: return "Error"

    def push(self, key=None, value=None):

        """Push the value on session """
        try:
            self.context['ti'].xcom_push(key=key, value=value)
            return True
        except Exception as e: return False



def create_dag(dag_id,schedule,dag_number,default_args):

    def hello_world_py():
        print('Hello World')

    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)

    with dag:
        t1 = PythonOperator(task_id=dag_id,python_callable=hello_world_py)

    return dag


def simple_task(**context):

    DATA = ["soumil", "Shah"]
    
    for n in range(1, len(DATA)):
        try:
            dag_id = 'hello_world_{}'.format(str(n))
            print("DAG ID : {} ".format(dag_id))
            default_args = {'owner': 'airflow','start_date': datetime(2018, 1, 1)}
            schedule = '@daily'
            dag_number = n
            globals()[dag_id] = create_dag(dag_id,schedule, dag_number,default_args)
        except Exception as e:
            print("Error : {} ".format(e))

with DAG(dag_id="project", schedule_interval="@once", default_args=default_args, catchup=False) as dag:

    simple_task = PythonOperator(task_id="simple_task",
                                 python_callable=simple_task,
                                 provide_context=True)


simple_task


Я хочу создать эти даги на основе len переменной DATA, что данные поступают из базы данных

я пытался заглянуть в

любая помощь была бы замечательной

Пересмотренный код:

try:
    import os
    import sys

    from datetime import timedelta, datetime
    from airflow import DAG

    from airflow.operators.python_operator import PythonOperator

    # from airflow.operators.email_operator import EmailOperator
    # from airflow.utils.trigger_rule import TriggerRule
    # from airflow.utils.task_group import TaskGroup
    # import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


def create_dag(dag_id, schedule, dag_number, default_args):
    def hello_world_py():
        print('Hello World')

    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)

    with dag:
        t1 = PythonOperator(task_id=dag_id, python_callable=hello_world_py)

    return dag


def simple_task():
    DATA = ["soumil", "Shah", "Shah2"]

    for n in range(0, len(DATA)):
        try:
            dag_id = 'hello_world_{}'.format(str(n))
            print("DAG ID : {} ".format(dag_id))
            default_args = {'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}
            schedule = '@daily'
            dag_number = n
            globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
        except Exception as e:
            print("Error : {} ".format(e))

    def trigger_function():
        print("HEREE")
        simple_task()

    with DAG(dag_id="project", schedule_interval="@once", default_args={'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}, catchup=False) as dag:


        trigger_function = PythonOperator(task_id="trigger_function",python_callable=trigger_function,provide_context=True,)


    trigger_function

person Soumil Nitin Shah    schedule 31.03.2021    source источник
comment
Вам нужен даг "проект"? Ваш текущий код создает даг, который генерирует даги.   -  person jay.cs    schedule 01.04.2021
comment
ну, я хочу сгенерировать DAG на основе функции, как только я выполню эту функцию, она скажет, что теперь вернет 2 элемента, что я хочу сделать, это создать 2 DAG, если это имеет смысл   -  person Soumil Nitin Shah    schedule 01.04.2021


Ответы (1)


Я удалил несколько строк из вашего кода, чтобы ответить по существу. Приведенный ниже код будет генерировать группы DAG, такие как hello_world_0, hello_world_1..., на основе содержимого DATA.

РЕДАКТИРОВАТЬ - я использовал воздушный поток v1.10.x, но код должен работать для v2.x

Предложения:

  1. Сделайте имена задач отличными от имен DAG.
  2. dag_number переменная в настоящее время не используется. Это можно снять.

Группы DAG будут выглядеть так -

введите описание изображения здесь

try:
    import os
    import sys

    from datetime import timedelta, datetime
    from airflow import DAG

    from airflow.operators.python_operator import PythonOperator

    # from airflow.operators.email_operator import EmailOperator
    # from airflow.utils.trigger_rule import TriggerRule
    # from airflow.utils.task_group import TaskGroup
    # import pandas as pd

    print("All Dag modules are ok ......")

except Exception as e:
    print("Error  {} ".format(e))


def create_dag(dag_id, schedule, dag_number, default_args):
    def hello_world_py():
        print('Hello World')

    dag = DAG(dag_id, schedule_interval=schedule, default_args=default_args)

    with dag:
        t1 = PythonOperator(task_id=dag_id, python_callable=hello_world_py)

    return dag


def simple_task():
    DATA = ["soumil", "Shah", "Shah2"]

    for n in range(0, len(DATA)):
        try:
            dag_id = 'hello_world_{}'.format(str(n))
            print("DAG ID : {} ".format(dag_id))
            default_args = {'owner': 'airflow', 'start_date': datetime(2018, 1, 1)}
            schedule = '@daily'
            dag_number = n
            globals()[dag_id] = create_dag(dag_id, schedule, dag_number, default_args)
        except Exception as e:
            print("Error : {} ".format(e))


simple_task()
person jay.cs    schedule 31.03.2021
comment
Большое спасибо, однако я хочу запустить процесс создания DAG, когда я вручную нажимаю кнопку в пользовательском интерфейсе. - person Soumil Nitin Shah; 01.04.2021
comment
Я обновил описание выше, я хочу запустить это только при нажатии из пользовательского интерфейса Вручную, вы можете помочь - person Soumil Nitin Shah; 01.04.2021
comment
SubDag может быть более подходящим с измененным вариантом использования. DAG, генерирующий другой DAG, обычно не рекомендуется и я бы сказал, что это невозможно. - person jay.cs; 01.04.2021
comment
stackoverflow.com/questions/62962386/ - person jay.cs; 01.04.2021
comment
Большое спасибо, вот что я пытаюсь достичь. Существует веб-сайт A, который вводит информацию о задаче, которая теперь добавляется в базу данных в воздушном потоке. Я хочу создать Dag на основе информации, введенной пользователем, и динамически генерировать этот dag. Проблема возникает, когда пользователь добавляет элемент, я хочу создать для него Dag. теперь мне нужно вернуться на панель управления воздушным потоком, которая запустит скрипт и сделает даг, что, по вашему мнению, здесь лучше всего - person Soumil Nitin Shah; 01.04.2021