Невозможно использовать переменную python в шаблоне jinja с Airflow

Я пытаюсь использовать Airflow для выполнения 11 шагов на AWS EMR и следую этому код для справки. Поскольку использование EmrAddStepsOperator и EmrStepSensor для 11 шагов было бы слишком большим повторением. Итак, я пытаюсь пройти через это. Я использовал приведенный ниже код в своей группе DAG.

step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]

# @evalcontextfilter
# def dangerous_render(context, value):
#     return Markup(Template(value).render(context)).render()

for i in range(0,len(steps)):
        #Add step
    step_adder.append(EmrAddStepsOperator(
        task_id=steps[i],
        job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
        aws_conn_id='aws_default',
        steps=eval('step_'+str(i+1)),
    ))
    print(step_adder)
        #Step Sensor for checking
    step_checker.append(EmrStepSensor(
        task_id=steps[i]+'_check',
        job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
        #step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
        step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
        aws_conn_id='aws_default',
    ))

Я столкнулся с ошибкой, EmrStepSensor ожидает, что step_id от EMR будет вводиться здесь, и это генерируется полученным от xcom (я думаю, я не на 100% уверен, как работает этот код). Но мой шаг хранится в списке шагов, поэтому я не могу указать статическое значение здесь, в task_id в step_id, как указано в справочном коде, и я не могу понять, как использовать шаблон jinja со значением переменной python, чтобы поместить значения сюда из списка шагов.

Я использовал оба приведенных ниже способа, чтобы step_id мог получить правильный шаг из EMR в соответствии с именем шага в steps [i]

step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",

step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}")

Однако оба из них не удалось с синтаксической ошибкой в ​​Airflow. Так что, если кто-нибудь может указать мне правильное направление для этого, я был бы очень признателен. Я использую Airflow 1.10.12 (это версия Airflow по умолчанию в Managed Apache Airflow на AWS).


person bha159    schedule 03.02.2021    source источник


Ответы (2)


Я не уверен, что это уже решено, поэтому:

Использование f-строк:

f"{{{{ task_instance.xcom_pull(task_ids='{steps[i]}', key='return_value')[0] }}}}"

Использование .format: "{{{{ task_instance.xcom_pull(task_ids='{}', key='return_value')[0] }}}}".format(steps[i])

Обратите внимание, что вы должны убедиться, что значение ключа task_ids заключено в одинарные кавычки. Кроме того, возврат из xcom_pull - это список, поэтому индекс [0] в конце o

person Ricardo    schedule 10.03.2021
comment
Это было то, что я искал, помогало [0], иначе код не получал правильный идентификатор шага. - person bha159; 15.03.2021

Ты можешь сделать:

step_id= " {{{{ task_instance.xcom_pull(task_ids={}, key='return_value') }}}} ".format(steps[i])

или с f-string:

step_id= f" {{{{ task_instance.xcom_pull(task_ids={steps[i]}, key='return_value') }}}} "
person Elad    schedule 03.02.2021
comment
Ни один из них не работал. В журналах я вижу, что ни один шаг не был выбран [2021-02-03 17: 40: 11,438] {{emr_step_sensor.py:53}} ИНФОРМАЦИЯ - Poking step () в кластере j-xxxxxx [2021-02-03 17 : 40: 11,497] {{taskinstance.py:1150}} ОШИБКА. Произошла ошибка (InvalidRequestException) при вызове операции DescribeStep: идентификатор шага '()' недействителен. Отслеживание (последний вызов последний): - person bha159; 03.02.2021
comment
Мне удалось выполнить эту работу step_id = {{{{task_instance.xcom_pull (task_ids = '{}', key = 'return_value')}}}} .format (steps [i]) Однако отсюда я получаю неправильный формат , Я получил сообщение об ошибке ['s-xxxx'] не является действительным идентификатором шага - person bha159; 03.02.2021
comment
@ bha159, не могли бы вы показать, что вы видите на вкладке рендеринга. Может быть, код, который вы разместили в своем вопросе, не идентичен коду, который вы действительно запускаете? - person Elad; 03.02.2021
comment
См. Здесь DAG и журнал ошибок imgur.com/a/Moko0it - person bha159; 04.02.2021
comment
@ bha159 Вы не показали рендер. Пожалуйста, отредактируйте свой вопрос и добавьте изображение вкладки рендеринга. также добавьте журнал трассировки. Вы рубите бревно - невозможно понять, что произошло. - person Elad; 04.02.2021
comment
Извините, я только начал использовать Airflow, поэтому не понимаю, где находится вкладка рендеринга. Не могли бы вы предоставить более подробную информацию по этому поводу. Я загрузил еще две фотографии с полными журналами и вкладкой рендеринга в пользовательском интерфейсе, надеюсь, это то, что нужно для получения дополнительной информации - person bha159; 04.02.2021
comment
@ bha159 в графическом представлении щелкните задачу, которая не выполняется, и выберите Rendered. - person Elad; 04.02.2021
comment
сделал то же самое и получил этот результат на этой странице i.imgur.com/EWDXQTk.png - person bha159; 05.02.2021