Как я могу xcom_push получить файл, полученный после распаковки файла .gz в BashOperator Airflow?

Я использую BashOperator для распаковки файла .gz в Airflow.

gzip -d имя_архива.csv.gz

Таким образом, команда gzip заменяет исходный файл .gz распакованным файлом имя_архива.csv.

Моя задача в Airflow

gzip_file = BashOperator(
    task_id = "gzip_file",
    bash_command = "gzip -d archive_name.csv.gz",
    dag=dag
)

Теперь мне нужно знать имя файла в другой задаче в Airflow, поэтому я хочу, чтобы задача gzip_file передала имя файла с помощью xcom, чтобы моя другая задача могла получить имя файла и использовать его. Как я могу это сделать?


person tank    schedule 09.08.2019    source источник


Ответы (2)


Предполагая, что вы используете последнюю версию Ariflow, вы можете установить для do_xcom_push[1] значение true и отобразить ваш разархивированный файл как последнюю команду, записанную на стандартный вывод, а airflow должен сделать все остальное.

Если BaseOperator.do_xcom_push имеет значение True, последняя строка, записанная в стандартный вывод, также будет отправлена ​​в XCom после завершения команды bash.

Затем нижестоящая задача может использовать xcom pull для получения этого имени файла [2].

[1]https://github.com/apache/airflow/blob/45244e38d386f20838a2cc85fbc72edca843a5e1/airflow/operators/bash_operator.py#L34
[2]https://github.com/apache/airflow/blob/master/airflow/example_dags/example_xcom.py

person Chengzhi    schedule 09.08.2019

BashOperator имеет параметр xcom_push. Если xcom_push имеет значение True, последняя строка, записанная в стандартный вывод, также будет отправлена ​​в XCom после завершения команды bash.

Поэтому запустите команду bash, чтобы последняя строка содержала имя вашего несжатого файла.

gzip_file = BashOperator(
    task_id = "gzip_file",
    bash_command = "gzip -d archive_name.csv.gz | ls archive_name.csv",
    xcom_push=True,
    dag=dag
)
person kaxil    schedule 12.08.2019