Вопросы по теме 'apache-airflow-xcom'

KeyError: 'ti' в Apache Airflow xcom
Мы пытаемся запустить простой DAG с двумя задачами, которые будут передавать данные через xcom. файл DAG: from __future__ import print_function import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator...
7338 просмотров
schedule 16.01.2023

Xcom pull возвращает NameError на `ti`
Я пытаюсь вставить в xcom значение с ключом last_date в last_date_task , а затем вытащить его из xcom во второй задаче ga_wh_task . когда я тестирую этот даг, запустив python dag.py , он возвращает ошибку в этой строке, например:...
2051 просмотров

Airflow k8s operator xcom - состояние рукопожатия 403 запрещено
Когда я запускаю образ докеры с помощью KubernetesPodOperator в Airflow версии 1.10 После того, как модуль успешно завершает задачу, воздушный поток пытается получить значение xcom, установив соединение с модулем через клиент потока k8s. Ниже...
1584 просмотров

Воздушный поток, XCom и несколько идентификаторов задач
Как работают task_ids, когда указано несколько задач? В этом конкретном примере кода я ожидал получить load_cycle_id_2 из обеих задач в кортеже (5555,22222), но вместо этого он получил (None, 22222). Это почему? from airflow.models import...
2327 просмотров
schedule 20.02.2022

Как передать динамические аргументы оператору воздушного потока?
Я использую Airflow для запуска заданий Spark в Google Cloud Composer. Мне необходимо Создать кластер (параметры YAML предоставляются пользователем) список искровых заданий (параметры задания также предоставляются YAML для каждого задания)...
2632 просмотров

Как я могу xcom_push получить файл, полученный после распаковки файла .gz в BashOperator Airflow?
Я использую BashOperator для распаковки файла .gz в Airflow. gzip -d имя_архива.csv.gz Таким образом, команда gzip заменяет исходный файл .gz распакованным файлом имя_архива.csv. Моя задача в Airflow gzip_file = BashOperator( task_id...
331 просмотров
schedule 05.12.2022

xcomm vales от subdag до dag in airflow composer
Прежде чем я начну, я прошу прощения, так как этот тип вопросов задавался раньше, но мне все еще трудно понять, как выполнить сценарий ниже. Я начал профессионально работать с воздушным потоком и кодированием на Python в течение месяца, поэтому,...
356 просмотров

Использование входных переменных Json в действиях оператора EMR воздушного потока
В настоящее время я следую приведенному здесь шаблону: https://github.com/apache/airflow/blob/master/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py , чтобы создать DAG для вызова экземпляра emr с помощью искровой отправки. При...
985 просмотров

Как извлечь переменную xcom из предыдущего запуска
Как вытащить переменную xcom из предыдущего запуска в воздушном потоке? Является ли это возможным? Я хочу использовать значение из того же task_id в предыдущем run_id в качестве переменной jinja для аргумента data в SimpeHttpOperator....
412 просмотров
schedule 07.06.2022

Как получить значение XCOM в PostgresOperator
Здесь я нажимаю значение XCOM: task_get_username_bash = BashOperator( task_id='execute_bash', bash_command='whoami', xcom_push=True) Таким образом, в XCOM он хранится как {'return_value': '$...
1063 просмотров
schedule 13.08.2022

Невозможно определить проблему с ошибкой задачи GCP Composer (Airflow) DAG
Я новичок в использовании Apache Airflow. Некоторые операторы моего дага имеют статус сбой. Я пытаюсь понять причину ошибки. Вот подробности проблемы: Мой даг довольно большой, и некоторые его части состоят из суб-дагов. Что я заметил в...
691 просмотров

Передача данных между пользовательскими операторами воздушного потока
Я создаю свои собственные пользовательские операторы в воздушном потоке и хотел бы использовать вывод одного оператора в качестве ввода другого оператора. В настоящее время я сохраняю вывод в s3 и читаю их из s3 в следующем операторе, что кажется...
81 просмотров
schedule 27.06.2023

Airflow задайте run_id с параметром из конфигурации JSON
Я хочу автоматически установить для run_id более осмысленное имя. Как я понял, сейчас run_id задается в TriggerDagRunOperator. Я видел в этом потоке предложение по замене TriggerDagRunOperator для данные. Я также хочу, чтобы изменение...
109 просмотров

Airflow2 Динамическое создание Dag после выполнения функции
Здравствуйте, все, что я работаю с воздушным потоком, вот сценарий, который я пытаюсь решить, я хочу динамически создавать DAG после запуска функции try: import os import sys from datetime import timedelta,datetime from airflow...
52 просмотров