xcomm vales от subdag до dag in airflow composer

Прежде чем я начну, я прошу прощения, так как этот тип вопросов задавался раньше, но мне все еще трудно понять, как выполнить сценарий ниже.

Я начал профессионально работать с воздушным потоком и кодированием на Python в течение месяца, поэтому, пожалуйста, игнорируйте ужасно написанную функцию Python, но она в основном принимает имя строкового файла и возвращает строковое значение, которое я могу использовать для дельт.

шаги: я хочу поместить все файлы в корзину с префиксом ABC и перебрать их.

подход: ниже код

#!/usr/bin/env python

"""

"""
from datetime import datetime, timedelta
from airflow import DAG
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.gcs_to_bq import GoogleCloudStorageToBigQueryOperator
from airflow.contrib.operators.gcs_list_operator import GoogleCloudStorageListOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import PythonOperator
#from airflow.contrib.hooks import gcs_hook
from airflow.contrib.sensors.gcs_sensor import GoogleCloudStoragePrefixSensor
from airflow.contrib.operators.bigquery_table_delete_operator import BigQueryTableDeleteOperator
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
#import GenDeltaDate
from datetime import datetime
#from airflow.operators import InvalidDataFilterOperator

YESTERDAY = datetime.combine(
    datetime.today() - timedelta(days=1), datetime.min.time())
BQ_DATASET_NAME = 'Master'
CURRENT_TIME = datetime


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': YESTERDAY,
    #'email': [],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
    'provide_context': True,
    'dataflow_default_options': {
        'project': 'project',
        'zone': 'us-east1-f'
    }
}
files_to_process = ['abc']
bucket = 'bucket_name'


def pull(**context):
            archive(context['ti'].xcom_pull(task_ids='list_files'))

import re
def gen_delta_date(input_file,**kwargs):
    # stepl1: check for file extension and remove it
    idx_extension           = input_file.find(".")
    input_file_name         = input_file[:idx_extension]
    #check for 3 pairs of numeric values sperated by underscores and grab that value.
    find_date_time_part     = re.findall("_(\d*?_\d*?_\d*)",input_file_name)
    #massaging the value by removing unneeded char's
    find_date_time_part     = str(find_date_time_part).split('_', 1)[-1].strip(']')
    find_date_time_part     = str(find_date_time_part)
    find_date_time_part     = re.sub("'",'', find_date_time_part)
    find_date_time_part_len = len(find_date_time_part)
    '''
    to-do:
    1. need to remove hard coded length value and pass as a parameter.

    '''
    if find_date_time_part_len == 15:
        #Splitting the transformed input file name based on _ and save it into a list
        x = [a for a in find_date_time_part.split('_') if a]
        #get the date time part from the list i.e split at underscore
        x = (' '.join(x[-2:]))
        #print(x)
        #Using strptime to parse the string value as datetime object here our date format is YYYYMMDD hhmiss
        dt_obj = datetime.strptime(x, "%Y%m%d %H%M%S")
        # use strftime to format the date object into desired format in our case YYYY-MM-DD hh:mi:ss
        final_date_formatted = dt_obj.strftime("%Y-%m-%d %H:%M:%S")
        #print(type(find_date_time_part))
        return final_date_formatted
    else:
        print("Error: Input filename does not match the naming conventions:The input file naming format shoud be *xx_YYYYMMDD_hhmiss for proper parsing xx is numeric value here {0}_{1}".format(find_date_time_part_len,input_file))

with DAG('Test', default_args=default_args,
    schedule_interval=None,
) as dag:
    for item in files_to_process:
    #########################################################################
    #########################################################################
    ##############List the files in the bucket###############################
    #########################################################################
    #########################################################################
        GCS_File_list = GoogleCloudStorageListOperator(
                    task_id= 'list_files',
                    bucket= bucket,
                    prefix='ABC',
                    delimiter='.csv',
                    google_cloud_storage_conn_id='google_cloud_default',
                    #provide_context = True,
                    dag = dag
                )

        for  idx, file in enumerate(["{{ ti.xcom_pull(task_ids='list_files') }}"]):
            #print(idx)
            #print(file)
            Python_Task = PythonOperator(
                             task_id=item+'_pass_date',
                             provide_context=True,
                             python_callable=gen_delta_date,
                             op_kwargs={'input_file':file},
                             trigger_rule=TriggerRule.ALL_SUCCESS,
                             #provide_context = True,
                             #xcom_push=True,
                             dag=dag
                            )
            sql_task = BigQueryOperator(
                       task_id='query',
                       sql='test.sql',
                       destination_dataset_table='{0}.list_test'.format(BQ_DATASET_NAME),
                       bigquery_conn_id='bigquery_default',
                       use_legacy_sql=False,
                       trigger_rule=TriggerRule.ALL_SUCCESS,
                       provide_context=True,
                       create_disposition = 'CREATE_IF_NEEDED',
                       write_disposition = 'WRITE_APPEND'
                      )
#Orchestration.
GCS_File_list >> Python_Task >> sql_task

Но после проверки я вижу, что имя файла, переданное функции python, не является шаблоном и передается как строка xcom.pull

после проведения некоторого исследования и обнаружения точно такого же кода, а также причин, указывающих, почему это не работает. ссылка: [воздушный поток не может перебирать список xcom_pull с помощью GoogleCloud Operatos

В приведенном выше сообщении было упомянуто использование вложенных тегов и достижение функциональности, но скажем, если у меня есть задача GCS_File_list в качестве вложенного тега, как мне вернуть значения в виде списка обратно в мой основной тег, а затем использовать список файлов и затем я могу перебрать файлы для запуска Python_Task и sql_task.

Насколько я понимаю, я должен использовать "{{ti.xcom_pull (task_ids = 'list_files')}}" внутри оператора, а не то, что я сделал в приведенном выше коде (для idx, файл в перечислении (["{{ ti.xcom_pull (task_ids = 'list_files')}} "]) то как я могу сохранить значение в виде списка.

Любые указатели или советы очень ценятся.

Спасибо.


person kumarm    schedule 10.09.2019    source источник


Ответы (1)


Привет, я использовал совершенно другой подход, чтобы решить эту проблему для любого интересующего цикла вопрос о переменных расхода воздуха

С Уважением.

person kumarm    schedule 12.09.2019
comment
Не самый элегантный способ, не уверен, что этот метод предписан, нужно много проверять, но на поверхности работает хорошо - person kumarm; 12.09.2019