Пример Airflow kubernetesPorOperator не запускается

При попытке запустить образец kubernetesPodOperator получает:

[2020-05-25 20: 00: 40,475] {{init .py: 51}} ИНФОРМАЦИЯ - Использование исполнителя LocalExecutor
[2020-05-25 20: 00: 40,475] {{ dagbag.py:396}} ИНФОРМАЦИЯ - заполнение DagBag из /usr/local/airflow/dags/kubernetes_example.py
│ │ Traceback (последний вызов последним):
│ │ Файл "/ usr / local / bin / airflow ", строка 37, в
│ args.func (args)
│ File" /usr/local/lib/python3.7/site-packages/airflow/utils/cli.py ", строка 75, в оболочке
│ │ return f (* args, ** kwargs)
│ Файл" /usr/local/lib/python3.7/site-packages/airflow/bin/cli. py ", строка 523, выполняется
│ │ dag = get_dag (args)
│ Файл" /usr/local/lib/python3.7/site-packages/airflow/bin/cli.py ", строка 149, в get_dag
│ 'parse.'. format (args.dag_id))
│ airflow.exceptions.AirflowException: dag_id не может быть найден: kubernetes_example. Либо дага не существовало, либо его не удалось разобрать.

Это код, который я использую:

from airflow import DAG
from datetime import datetime, timedelta
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.dates import days_ago



default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'email': ['[email protected]'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=60)
}

dag = DAG(
    'kubernetes_example', default_args=default_args, schedule_interval=timedelta(minutes=60))


start = DummyOperator(task_id='run_this_first', dag=dag)

passing = KubernetesPodOperator(namespace='airflow',
                          image="python:3.6.10",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="passing-test",
                          task_id="passing-task",
                          env_vars={'EXAMPLE_VAR': '/example/value'},
                          in_cluster=True,
                          get_logs=True,
                          dag=dag
                          )

failing = KubernetesPodOperator(namespace='airflow',
                          image="ubuntu:18.04",
                          cmds=["Python","-c"],
                          arguments=["print('hello world')"],
                          labels={"foo": "bar"},
                          name="fail",
                          task_id="failing-task",
                          get_logs=True,
                          dag=dag
                          )

passing.set_upstream(start)
failing.set_upstream(start)

Я только что взял его у исполнителя сэмплов. Кто-нибудь сталкивался с этой проблемой?

Спасибо!


person DarkSpark    schedule 25.05.2020    source источник
comment
вы можете разместить свой код? Я почти уверен, что ваше определение DAG неверно, отсутствует dag_id   -  person mucio    schedule 25.05.2020
comment
Спасибо за комментарий, добавил DAG   -  person DarkSpark    schedule 25.05.2020
comment
Спасибо, проверьте мой ответ ниже   -  person mucio    schedule 25.05.2020


Ответы (1)


У вас должно быть имя (dag_id для вашего дага).

dag = DAG(
    dag_id='kubernetes_example', 
    default_args=default_args, 
    schedule_interval=timedelta(minutes=60)
)

Также ваш task_id должен иметь _ не - и быть: task_id="failing_task"

person mucio    schedule 25.05.2020
comment
Спасибо за помощь, но проблема не в этом. Я изменил DAG в соответствии с вашим комментарием, и это не решило проблему. Кстати, вы можете увидеть это в учебнике (airflow.apache.org/docs/stable/tutorial. html), что это не обязательно. все равно попробовал, и у меня это не сработало. все еще ценю помощь - person DarkSpark; 26.05.2020
comment
извините, мое плохое, также в моем предложении для task_id не было необходимости (я просто придерживаюсь наших внутренних правил). Но ваш код на моей машине генерирует даг правильно, мне интересно, не связана ли ваша проблема не только со старой версией Airflow на вашем компьютере. - person mucio; 26.05.2020
comment
Я использую последнюю версию 1.10.10. может быть в моем развертывании есть потоки .. используете Helm Chart? не могли бы вы поделиться своим развертыванием / диаграммой? (только по возможности). В любом случае спасибо, mucio! - person DarkSpark; 26.05.2020
comment
Я использую внутренний мультитенантный Airflow. Мне также интересно, является ли dag, который пытается загрузить ваш Airflow, фактическим файлом, который вы изменяете, или более старой копией - person mucio; 26.05.2020
comment
Что меня действительно сводит с ума, так это то, что он выводит: `` ИНФОРМАЦИЯ - Использование исполнителя LocalExecutor``. Пытался изменить env var, но не получилось, может у вас есть идея? - person DarkSpark; 26.05.2020