Как получить значение XCOM в PostgresOperator

Здесь я нажимаю значение XCOM:

task_get_username_bash = BashOperator(
                task_id='execute_bash',
                bash_command='whoami',
                xcom_push=True)

Таким образом, в XCOM он хранится как {'return_value': '$ USER'} (в моем случае $ USER = 'airflow').

Затем я хочу получить это return_value из XCOM:

task_insert_new_row = PostgresOperator(
                task_id='insert_new_row',
                trigger_rule=TriggerRule.ALL_DONE,
                sql='''INSERT INTO table_name VALUES
                (%s, %s, %s);''',
                parameters=(uuid.uuid4().int % 123456789,
                            "{{ ti.xcom_pull(task_ids='execute_bash', key='return_value') }}",
                            datetime.now()))

Но PostgresOperator интерпретирует ссылку на макрос как str. Как вытащить XCOM в PostgresOperator?


person Vladyslav    schedule 27.03.2020    source источник


Ответы (2)


Задача решена:

task_insert_new_row = PostgresOperator(
                task_id='insert_new_row',
                trigger_rule=TriggerRule.ALL_DONE,
                sql='''INSERT INTO table_name VALUES
                (%s, '{{ ti.xcom_pull(task_ids='execute_bash', key='return_value') }}', %s);''',
                parameters=(uuid.uuid4().int % 123456789, datetime.now()))
person Vladyslav    schedule 28.03.2020
comment
Что делать, если у меня есть параметр sql=file.sql, и я не могу его изменить, я могу передать только некоторые параметры, полученные из переменных внутри DAG, и один из них имеет содержимое ti.xcom_pull(...)? - person eddy85br; 05.06.2020
comment
@Vladyslav, знаете ли вы, как отправить возвращаемое значение по postgres другому оператору. Представьте, что вместо сценария bash у вас есть оператор sql. Если вы знаете, пожалуйста, помогите мне с этим в stackoverflow.com/questions/67614989/ - person Maryam Pashmi; 20.05.2021

@ eddy85br - Вы также можете использовать шаблоны и макросы jinja в своих файлах "* .sql" (например, если вы хотите передать Execution_date для фильтрации набора результатов). Проверить

https://forum.astronomer.io/t/how-can-i-pass-sql-as-a-file-w-airflows-postgres-operator/355

person Ozgur G    schedule 05.06.2020