Вопросы по теме 'apache-airflow-xcom'
KeyError: 'ti' в Apache Airflow xcom
Мы пытаемся запустить простой DAG с двумя задачами, которые будут передавать данные через xcom.
файл DAG:
from __future__ import print_function
import airflow
from airflow import DAG
from airflow.operators.bash_operator import BashOperator...
7338 просмотров
schedule
16.01.2023
Xcom pull возвращает NameError на `ti`
Я пытаюсь вставить в xcom значение с ключом last_date в last_date_task , а затем вытащить его из xcom во второй задаче ga_wh_task . когда я тестирую этот даг, запустив python dag.py , он возвращает ошибку в этой строке, например:...
2051 просмотров
schedule
17.05.2024
Airflow k8s operator xcom - состояние рукопожатия 403 запрещено
Когда я запускаю образ докеры с помощью KubernetesPodOperator в Airflow версии 1.10
После того, как модуль успешно завершает задачу, воздушный поток пытается получить значение xcom, установив соединение с модулем через клиент потока k8s.
Ниже...
1584 просмотров
schedule
08.02.2023
Воздушный поток, XCom и несколько идентификаторов задач
Как работают task_ids, когда указано несколько задач?
В этом конкретном примере кода я ожидал получить load_cycle_id_2 из обеих задач в кортеже (5555,22222), но вместо этого он получил (None, 22222).
Это почему?
from airflow.models import...
2327 просмотров
schedule
20.02.2022
Как передать динамические аргументы оператору воздушного потока?
Я использую Airflow для запуска заданий Spark в Google Cloud Composer. Мне необходимо
Создать кластер (параметры YAML предоставляются пользователем)
список искровых заданий (параметры задания также предоставляются YAML для каждого задания)...
2632 просмотров
schedule
11.04.2022
Как я могу xcom_push получить файл, полученный после распаковки файла .gz в BashOperator Airflow?
Я использую BashOperator для распаковки файла .gz в Airflow.
gzip -d имя_архива.csv.gz
Таким образом, команда gzip заменяет исходный файл .gz распакованным файлом имя_архива.csv.
Моя задача в Airflow
gzip_file = BashOperator(
task_id...
331 просмотров
schedule
05.12.2022
xcomm vales от subdag до dag in airflow composer
Прежде чем я начну, я прошу прощения, так как этот тип вопросов задавался раньше, но мне все еще трудно понять, как выполнить сценарий ниже.
Я начал профессионально работать с воздушным потоком и кодированием на Python в течение месяца, поэтому,...
356 просмотров
schedule
21.05.2023
Использование входных переменных Json в действиях оператора EMR воздушного потока
В настоящее время я следую приведенному здесь шаблону: https://github.com/apache/airflow/blob/master/airflow/contrib/example_dags/example_emr_job_flow_manual_steps.py , чтобы создать DAG для вызова экземпляра emr с помощью искровой отправки. При...
985 просмотров
schedule
08.11.2022
Как извлечь переменную xcom из предыдущего запуска
Как вытащить переменную xcom из предыдущего запуска в воздушном потоке? Является ли это возможным?
Я хочу использовать значение из того же task_id в предыдущем run_id в качестве переменной jinja для аргумента data в SimpeHttpOperator....
412 просмотров
schedule
07.06.2022
Как получить значение XCOM в PostgresOperator
Здесь я нажимаю значение XCOM:
task_get_username_bash = BashOperator(
task_id='execute_bash',
bash_command='whoami',
xcom_push=True)
Таким образом, в XCOM он хранится как {'return_value': '$...
1063 просмотров
schedule
13.08.2022
Невозможно определить проблему с ошибкой задачи GCP Composer (Airflow) DAG
Я новичок в использовании Apache Airflow. Некоторые операторы моего дага имеют статус сбой. Я пытаюсь понять причину ошибки.
Вот подробности проблемы: Мой даг довольно большой, и некоторые его части состоят из суб-дагов. Что я заметил в...
691 просмотров
schedule
05.05.2022
Передача данных между пользовательскими операторами воздушного потока
Я создаю свои собственные пользовательские операторы в воздушном потоке и хотел бы использовать вывод одного оператора в качестве ввода другого оператора. В настоящее время я сохраняю вывод в s3 и читаю их из s3 в следующем операторе, что кажется...
81 просмотров
schedule
27.06.2023
Airflow задайте run_id с параметром из конфигурации JSON
Я хочу автоматически установить для run_id более осмысленное имя.
Как я понял, сейчас run_id задается в TriggerDagRunOperator. Я видел в этом потоке предложение по замене TriggerDagRunOperator для данные.
Я также хочу, чтобы изменение...
109 просмотров
schedule
13.03.2023
Airflow2 Динамическое создание Dag после выполнения функции
Здравствуйте, все, что я работаю с воздушным потоком, вот сценарий, который я пытаюсь решить, я хочу динамически создавать DAG после запуска функции
try:
import os
import sys
from datetime import timedelta,datetime
from airflow...
52 просмотров
schedule
23.10.2023