Я пытаюсь использовать Airflow для выполнения 11 шагов на AWS EMR и следую этому код для справки. Поскольку использование EmrAddStepsOperator и EmrStepSensor для 11 шагов было бы слишком большим повторением. Итак, я пытаюсь пройти через это. Я использовал приведенный ниже код в своей группе DAG.
step_adder = list()
step_checker = list()
steps = ['step1', 'step2', 'step3', 'step4', 'step5', 'step6'...till step11]
# @evalcontextfilter
# def dangerous_render(context, value):
# return Markup(Template(value).render(context)).render()
for i in range(0,len(steps)):
#Add step
step_adder.append(EmrAddStepsOperator(
task_id=steps[i],
job_flow_id="{{ task_instance.xcom_pull(task_ids='create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=eval('step_'+str(i+1)),
))
print(step_adder)
#Step Sensor for checking
step_checker.append(EmrStepSensor(
task_id=steps[i]+'_check',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
#step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}").render({'params': {'step': steps[i]}}))',
aws_conn_id='aws_default',
))
Я столкнулся с ошибкой, EmrStepSensor ожидает, что step_id от EMR будет вводиться здесь, и это генерируется полученным от xcom (я думаю, я не на 100% уверен, как работает этот код). Но мой шаг хранится в списке шагов, поэтому я не могу указать статическое значение здесь, в task_id в step_id, как указано в справочном коде, и я не могу понять, как использовать шаблон jinja со значением переменной python, чтобы поместить значения сюда из списка шагов.
Я использовал оба приведенных ниже способа, чтобы step_id мог получить правильный шаг из EMR в соответствии с именем шага в steps [i]
step_id="{{"task_instance.xcom_pull(task_ids={}, key='return_value')[0]",steps[i]}}",
step_id='(Template("{{ "task_instance.xcom_pull(task_ids=params.step, key='return_value')[0] }}")
Однако оба из них не удалось с синтаксической ошибкой в Airflow. Так что, если кто-нибудь может указать мне правильное направление для этого, я был бы очень признателен. Я использую Airflow 1.10.12 (это версия Airflow по умолчанию в Managed Apache Airflow на AWS).