Использование входных переменных Json в действиях оператора EMR воздушного потока

В настоящее время я следую приведенному здесь шаблону: https://github.com/apache/airflow/blob/master/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py, чтобы создать DAG для вызова экземпляра emr с помощью искровой отправки. При настройке spark_test_steps мне нужно включить переменные, переданные из POST Json, чтобы заполнить отправку искры, как показано ниже:

SPARK_TEST_STEPS = [
    {
        'Name': 'calculate_pi',
        'ActionOnFailure': 'CONTINUE',
        'HadoopJarStep': {
            'Jar': 'command-runner.jar',
            'Args': [
                '/usr/lib/spark/bin/run-example',
                'SparkPi',
                kwargs['dag_run'].conf['var_1']
                kwargs['dag_run'].conf['var_2']
                '10'
            ]
        }
    }
]

Как я могу передать переменные, предоставленные POST Json, при этом следуя формату, указанному в ссылке git, чтобы они выглядели так, как показано ниже?

from datetime import timedelta
import airflow
from airflow import DAG
from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator
from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor
from airflow.contrib.operators.emr_terminate_job_flow_operator import EmrTerminateJobFlowOperator

DEFAULT_ARGS = {
    'owner': 'Airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False
}

dag = DAG(
    'emr_job_flow_manual_steps_dag',
    default_args=DEFAULT_ARGS,
    dagrun_timeout=timedelta(hours=2),
    schedule_interval='0 3 * * *'
)

var_1 = ''
var_2 = ''
SPARK_TEST_STEPS = []

def define_param(**kwargs):
    global var_1
    global var_2
    global SPARK_TEST_STEPS

    var_1 = str(kwargs['dag_run'].conf['var_1'])
    var_2 = str(kwargs['dag_run'].conf['var_2'])

    SPARK_TEST_STEPS = [
        {
            'Name': 'calculate_pi',
            'ActionOnFailure': 'CONTINUE',
            'HadoopJarStep': {
                'Jar': 'command-runner.jar',
                'Args': [
                    '/usr/lib/spark/bin/run-example',
                    'SparkPi',
                    kwargs['dag_run'].conf['var_1']
                    kwargs['dag_run'].conf['var_2']
                    '10'
                ]
            }
        }
    ]

    return SPARK_TEST_STEPS

DEFINE_PARAMETERS = PythonOperator(
    task_id='DEFINE_PARAMETERS',
    python_callable=define_param,
    provide_context=True,
    dag=dag)

cluster_creator = EmrCreateJobFlowOperator(
    task_id='create_job_flow',
    job_flow_overrides=JOB_FLOW_OVERRIDES,
    aws_conn_id='aws_default',
    emr_conn_id='emr_default',
    dag=dag
)

step_adder = EmrAddStepsOperator(
    task_id='add_steps',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps='{{ ti.xcom_pull(task_ids="DEFINE_PARAMETERS") }}',
    dag=dag
)

step_checker = EmrStepSensor(
    task_id='watch_step',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    step_id="{{ task_instance.xcom_pull('add_steps', key='return_value')[0] }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_remover = EmrTerminateJobFlowOperator(
    task_id='remove_cluster',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    dag=dag
)

cluster_creator.set_downstream(step_adder)
step_adder.set_downstream(step_checker)
step_checker.set_downstream(cluster_remover)

Я не могу использовать Variable.get и Variable.set, так как это не позволит одновременно выполнять несколько вызовов dag для разных типов переменных из-за постоянного изменения глобальных переменных воздушного потока. Я попытался вызвать SPARK_TEST_STEPS с помощью xcom, но тип возвращаемого значения xcom - строка, а для шагов EmrAddStepsOperator требуется список.


person JMV12    schedule 04.10.2019    source источник
comment
как насчет того, чтобы вы использовали xcoms и обойти ограничение ..return type of xcom is string and EmrAddStepsOperator steps requires a list.., но выполняете json.loads(value_returned_by_xcom) `   -  person y2k-shubham    schedule 06.10.2019
comment
Я пробовал использовать json.loads, но по-прежнему получаю ту же ошибку.   -  person JMV12    schedule 07.10.2019


Ответы (1)


Я решил аналогичную проблему, создав собственный оператор, который анализирует json перед выполнением. Причина проблемы в том, что при передаче steps='{{ ti.xcom_pull(task_ids="DEFINE_PARAMETERS") }}',. вы буквально передаете строку со значением, интерполированным механизмом создания шаблонов, она не десериализуется.

from airflow.contrib.hooks.emr_hook import EmrHook
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator
import json

class DynamicEmrStepsOperator(EmrAddStepsOperator):
    template_fields = ['job_flow_id', 'steps']
    template_ext = ()
    ui_color = '#f9c915'

    @apply_defaults
    def __init__(
            self,
            job_flow_id=None,
            steps="[]",
            *args, **kwargs):
        super().__init__(
                job_flow_id = job_flow_id,
                steps = steps,
                *args, **kwargs)

    def execute(self, context):
        self.steps = json.loads(self.steps)

        return super().execute(context)
person Eric Cook    schedule 09.01.2020