Передача данных между пользовательскими операторами воздушного потока

Я создаю свои собственные пользовательские операторы в воздушном потоке и хотел бы использовать вывод одного оператора в качестве ввода другого оператора. В настоящее время я сохраняю вывод в s3 и читаю их из s3 в следующем операторе, что кажется эффективным способом. Я наткнулся на следующий пост Airflow и передача данных между операторами но я не мог понять это совершенно ясно. Я был бы очень признателен, если бы кто-нибудь мог привести мне хороший пример передачи данных между операторами.

class SFScoreDataOperator(BaseOperator):
    @apply_defaults
    def __init__(self, aws_key, aws_secret,
                 model_version='2020-04-v2',
                 *args, **kwargs):
        self.__version__ = model_version
        self.aws_key = aws_key
        self.aws_secret = aws_secret
        super(SFScoreDataOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        SuccessFactorData(aws_key=self.aws_key, aws_secret=self.aws_secret, to_random_shuffle=False).get_data()


class SFScoreGetTrainOperator(BaseOperator):
    @apply_defaults
    def __init__(self, aws_key, aws_secret,
                 model_version='2020-04-v2',
                 *args, **kwargs):
        self.__version__ = model_version
        self.aws_key = aws_key
        self.aws_secret = aws_secret
        super(SFScoreGetTrainOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        CM = SuccessFactorTrain(aws_key=self.aws_key, aws_secret=self.aws_secret)
        df_to_predict = CM.fetch_predict_data()
        df_train_test = CM.fetch_final_data()
        CM.train_test(final_df_for_train_test=df_train_test, segment_type=None)

class SuccessFactorDataPluginV2(AirflowPlugin):
    name = 'success_factor_data_plugin_v2'
    operators = [SFScoreDataOperator]


class SuccessFactorTrainPluginV2(AirflowPlugin):
    name = 'success_factor_train_plugin_v2'
    operators = [SFScoreGetTrainOperator]

Метод класса get_data в классе SFScoreDataOperator выводит две таблицы, которые являются входными данными метода train_test в SFScoreGetTrainOperator, а метод train_test выводит 3 переменные, которые будут входными данными для следующего пользовательского оператора. Не все выходные данные в формате CSV, поэтому я не могу записать их в s3. Я прочитал документацию XCom, но не уверен, как это реализовать, поэтому был бы очень признателен, если бы я мог получить толчок. Спасибо!


person akash bachu    schedule 26.04.2020    source источник