Мне нужно получить вывод команды bash (размером с файл) в SSHOperator. Я буду использовать это значение в качестве проверки условия для перехода к другим задачам. Я использую xcom, чтобы попытаться получить значение, и оператор branchpython для обработки решения, но безуспешно. Похоже, что SSHOperator не приносит пользы xcom.
Ниже приведен мой код:
#Required packages to execute DAG
from __future__ import print_function
import logging
from airflow.models import DAG
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.contrib.operators.ssh_operator import SSHOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.utils.trigger_rule import TriggerRule
log = logging.getLogger(__name__)
def decision_function(**context):
ti = context['ti']
fileSize = ti.xcom_pull(task_ids='get_param')
log.info('File size is: {}'.format(fileSize))
if fileSize >= 800000:
return 'good_path'
else:
return 'bad_path'
# DAG parameters
default_args = {
'owner': 'Me',
'depends_on_past': False,
'start_date': datetime(2020, 8, 17, 4, 15),
'dagrun_timeout': None,
'email': Variable.get('email'),
'email_on_failure': False,
'email_on_retry': False,
'provide_context': True,
'orientation': 'LR'#, TB, RL, BT)
}
# create DAG object with Name and default_args
with DAG(
'a_param',
schedule_interval=None,
description='params',
default_args=default_args
) as dag:
# Define tasks
begin = DummyOperator(
task_id='begin',
dag=dag
)
get_param = SSHOperator(
ssh_conn_id="oci_connection",
task_id='get_param',
xcom_push=True,
command="ls -ltr /tmp/adobegc.log | awk '{print $5}'",
dag=dag)
check_file = BranchPythonOperator(
task_id='check_file',
python_callable=decision_function,
provide_context=True,
dag=dag)
good_path = DummyOperator(
task_id='good_path',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
bad_path = DummyOperator(
task_id='bad_path',
trigger_rule=TriggerRule.NONE_FAILED,
dag=dag
)
begin >> get_param >> check_file
check_file >> good_path
check_file >> bad_path
Задача check_fail завершается со следующим журналом:
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file /usr/local/lib/airflow/airflow/contrib/operators/ssh_operator.py:75: PendingDeprecationWarning: Invalid arguments were passed to SSHOperator (task_id: get_param). Support for passing such arguments will be dropped in Airflow 2.0. Invalid arguments were:
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file *args: ()
[2020-10-12 16:58:20,572] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file **kwargs: {'xcom_push': True}
[2020-10-12 16:58:20,573] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file super(SSHOperator, self).__init__(*args, **kwargs)
[2020-10-12 16:58:20,906] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file Running <TaskInstance: a_param.check_file 2020-10-12T16:55:18.312240+00:00 [running]> on host airflow-worker-9b6fbd84c-l4jbs
[2020-10-12 16:58:20,990] {base_task_runner.py:113} INFO - Job 111304: Subtask check_file [2020-10-12 16:58:20,989] {a_param.py:25} INFO - File size is: None
[2020-10-12 16:58:20,990] {taskinstance.py:1135} ERROR - '>=' not supported between instances of 'NoneType' and 'int'
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 972, in _run_raw_tas
result = task_copy.execute(context=context
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 141, in execut
branch = super(BranchPythonOperator, self).execute(context
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 113, in execut
return_value = self.execute_callable(
File "/usr/local/lib/airflow/airflow/operators/python_operator.py", line 118, in execute_callabl
return self.python_callable(*self.op_args, **self.op_kwargs
File "/home/airflow/gcs/dags/a_param.py", line 26, in decision_functio
if fileSize >= 800000
TypeError: '>=' not supported between instances of 'NoneType' and 'int
Фрагмент журнала задач get_param
[2020-10-12 16:57:12,113] {base_task_runner.py:113} INFO - Job 111303: Subtask get_param [2020-10-12 16:57:12,112] {ssh_operator.py:109} INFO - Running command: ls -ltr /tmp/adobegc.log | awk '{print $5}'
[2020-10-12 16:57:12,549] {base_task_runner.py:113} INFO - Job 111303: Subtask get_param [2020-10-12 16:57:12,547] {ssh_operator.py:143} INFO - 516752
xcom_push
в своем SSHOperator наdo_xcom_push
. Проверяя страницу xcom, я не получаю ожидаемого результата. Я получаюkey: return_value ; Value:ODAwMAo=
. Я ожидаю размер файла под значением. - person comet   schedule 14.10.20208000
. - person Philipp Johannis   schedule 14.10.2020return b64encode(agg_stdout).decode('utf-8')
- source Я думаю, вы можете просто расшифровать его снова, как только снова прочитаете его значение. - person Philipp Johannis   schedule 14.10.2020int(base64.b64decode(fileSize).decode('utf-8'))
- person comet   schedule 14.10.2020