Я создаю свои собственные пользовательские операторы в воздушном потоке и хотел бы использовать вывод одного оператора в качестве ввода другого оператора. В настоящее время я сохраняю вывод в s3 и читаю их из s3 в следующем операторе, что кажется эффективным способом. Я наткнулся на следующий пост Airflow и передача данных между операторами но я не мог понять это совершенно ясно. Я был бы очень признателен, если бы кто-нибудь мог привести мне хороший пример передачи данных между операторами.
class SFScoreDataOperator(BaseOperator):
@apply_defaults
def __init__(self, aws_key, aws_secret,
model_version='2020-04-v2',
*args, **kwargs):
self.__version__ = model_version
self.aws_key = aws_key
self.aws_secret = aws_secret
super(SFScoreDataOperator, self).__init__(*args, **kwargs)
def execute(self, context):
SuccessFactorData(aws_key=self.aws_key, aws_secret=self.aws_secret, to_random_shuffle=False).get_data()
class SFScoreGetTrainOperator(BaseOperator):
@apply_defaults
def __init__(self, aws_key, aws_secret,
model_version='2020-04-v2',
*args, **kwargs):
self.__version__ = model_version
self.aws_key = aws_key
self.aws_secret = aws_secret
super(SFScoreGetTrainOperator, self).__init__(*args, **kwargs)
def execute(self, context):
CM = SuccessFactorTrain(aws_key=self.aws_key, aws_secret=self.aws_secret)
df_to_predict = CM.fetch_predict_data()
df_train_test = CM.fetch_final_data()
CM.train_test(final_df_for_train_test=df_train_test, segment_type=None)
class SuccessFactorDataPluginV2(AirflowPlugin):
name = 'success_factor_data_plugin_v2'
operators = [SFScoreDataOperator]
class SuccessFactorTrainPluginV2(AirflowPlugin):
name = 'success_factor_train_plugin_v2'
operators = [SFScoreGetTrainOperator]
Метод класса get_data в классе SFScoreDataOperator выводит две таблицы, которые являются входными данными метода train_test в SFScoreGetTrainOperator, а метод train_test выводит 3 переменные, которые будут входными данными для следующего пользовательского оператора. Не все выходные данные в формате CSV, поэтому я не могу записать их в s3. Я прочитал документацию XCom, но не уверен, как это реализовать, поэтому был бы очень признателен, если бы я мог получить толчок. Спасибо!