Как передать динамические аргументы оператору воздушного потока?

Я использую Airflow для запуска заданий Spark в Google Cloud Composer. Мне необходимо

  • Создать кластер (параметры YAML предоставляются пользователем)
  • список искровых заданий (параметры задания также предоставляются YAML для каждого задания)

С помощью Airflow API я могу читать файлы YAML и передавать переменные между задачами с помощью xcom.

Но обратите внимание на DataprocClusterCreateOperator()

  • cluster_name
  • project_id
  • zone

и несколько других аргументов помечены как шаблонные.

Что, если я хочу передать другие аргументы в качестве шаблонных (что в настоящее время не так)? - например, image_version, num_workers, worker_machine_type и т. д.?

Есть ли обходной путь для этого?


person sudeepgupta90    schedule 22.03.2019    source источник


Ответы (1)


Не уверен, что вы имеете в виду под «динамическим», но при обновлении файла yaml, если процесс чтения файла находится в теле файла dag, dag будет обновлен для применения новых аргументов из файла yaml. Так что на самом деле вам не нужен XCOM, чтобы получить аргументы. просто создайте словарь параметров, а затем перейдите к default_args:

CONFIGFILE = os.path.join(
    os.path.dirname(os.path.realpath(\__file__)), 'your_yaml_file')

with open(CONFIGFILE, 'r') as ymlfile:
    CFG = yaml.load(ymlfile)

default_args = {
    'cluster_name': CFG['section_A']['cluster_name'], # edit here according to the structure of your yaml file.
    'project_id': CFG['section_A']['project_id'],
    'zone': CFG['section_A']['zone'],
    'mage_version': CFG['section_A']['image_version'],
    'num_workers': CFG['section_A']['num_workers'],
    'worker_machine_type': CFG['section_A']['worker_machine_type'],
    # you can add all needs params here.
}

DAG = DAG(
    dag_id=DAG_NAME,
    schedule_interval=SCHEDULE_INTEVAL,
    default_args=default_args, # pass the params to DAG environment
)

Task1 = DataprocClusterCreateOperator(
    task_id='your_task_id',
    dag=DAG
)

Но если вам нужны динамические теги, а не аргументы, вам может потребоваться другая стратегия, например this.

Так что вам, вероятно, нужно понять основную идею: на каком уровне находится динамика? Уровень задачи? Уровень DAG?

Или вы можете создать своего собственного оператора, который будет выполнять работу и принимать параметры.

person AC at CA    schedule 22.03.2019
comment
Спасибо за ваш ответ. чтение файла yaml в качестве словаря параметров сработало для меня. - person sudeepgupta90; 25.03.2019
comment
прочтите, пожалуйста, об этом вопросе, у меня проблема в том же коде stackoverflow.com/questions/57410960/ - person Bhagesh Arora; 08.08.2019