Kubeflow Pipeline RuntimeError при запуске нескольких тренеров с tfx

Мне нравится, когда несколько тренеров работают одновременно, используя одни и те же ExampleGen, Schema и Transform. Ниже приведен мой код, добавляющий дополнительные компоненты, такие как Trainer2 Evaluator2 и Pusher2. Но я получаю следующую ошибку, и я не уверен, как их исправить. Подскажите, пожалуйста, заранее спасибо!

Ошибка: RuntimeError: Duplicated component_id Trainer для компонента типа tfx.components.trainer.component.Trainer

def create_pipeline(
    pipeline_name: Text,
    pipeline_root: Text,
    data_path: Text,
    preprocessing_fn: Text,
    run_fn: Text,
    run_fn2: Text,
    train_args: trainer_pb2.TrainArgs,
    train_args2: trainer_pb2.TrainArgs,
    eval_args: trainer_pb2.EvalArgs,
    eval_args2: trainer_pb2.EvalArgs,
    eval_accuracy_threshold: float,
    eval_accuracy_threshold2: float,
    serving_model_dir: Text,
    serving_model_dir2: Text,
    metadata_connection_config: Optional[
        metadata_store_pb2.ConnectionConfig] = None,
    beam_pipeline_args: Optional[List[Text]] = None,
    ai_platform_training_args: Optional[Dict[Text, Text]] = None,
    ai_platform_serving_args: Optional[Dict[Text, Any]] = None,
) -> pipeline.Pipeline:
  """Implements the custom pipeline with TFX."""

  components = []
  example_gen = CsvExampleGen(input=external_input(data_path))
  components.append(example_gen)

schema_gen = SchemaGen(
      statistics=statistics_gen.outputs['statistics'],
      infer_feature_shape=False)
  components.append(schema_gen)

transform = Transform(
      examples=example_gen.outputs['examples'],
      schema=schema_gen.outputs['schema'],
      preprocessing_fn=preprocessing_fn)
  components.append(transform)

trainer_args = {
      'run_fn': run_fn,
      'transformed_examples': transform.outputs['transformed_examples'],
      'schema': schema_gen.outputs['schema'],
      'transform_graph': transform.outputs['transform_graph'],
      'train_args': train_args,
      'eval_args': eval_args,
      'custom_executor_spec':
          executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
  }

  trainer = Trainer(**trainer_args)
  components.append(trainer)

  trainer_args2 = {
      'run_fn': run_fn2,
      'transformed_examples': transform.outputs['transformed_examples'],
      'schema': schema_gen.outputs['schema'],
      'transform_graph': transform.outputs['transform_graph'],
      'train_args': train_args2,
      'eval_args': eval_args2,
      'custom_executor_spec':
          executor_spec.ExecutorClassSpec(trainer_executor.GenericExecutor),
  }

  trainer2 = Trainer(**trainer_args2)
  components.append(trainer2)

  model_resolver = ResolverNode(
      instance_name='latest_blessed_model_resolver',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
  components.append(model_resolver)

  model_resolver2 = ResolverNode(
      instance_name='latest_blessed_model_resolver2',
      resolver_class=latest_blessed_model_resolver.LatestBlessedModelResolver,
      model=Channel(type=Model),
      model_blessing=Channel(type=ModelBlessing))
  components.append(model_resolver2)



  evaluator = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer.outputs['model'],
      #baseline_model=model_resolver.outputs['model'],
      # Change threshold will be ignored if there is no baseline (first run).
      eval_config=eval_config)
  components.append(evaluator)

  evaluator2 = Evaluator(
      examples=example_gen.outputs['examples'],
      model=trainer2.outputs['model'],
      baseline_model=model_resolver2.outputs['model'],
      # Change threshold will be ignored if there is no baseline (first run).
      eval_config=eval_config2)
  components.append(evaluator2)

  pusher_args = {
      'model':
          trainer.outputs['model'],
      'model_blessing':
          evaluator.outputs['blessing'],
      'push_destination':
          pusher_pb2.PushDestination(
              filesystem=pusher_pb2.PushDestination.Filesystem(
                  base_directory=serving_model_dir)),
  }

  pusher = Pusher(**pusher_args)  
  components.append(pusher)

  pusher_args2 = {
      'model':
          trainer2.outputs['model'],
      'model_blessing':
          evaluator2.outputs['blessing'],
      'push_destination':
          pusher_pb2.PushDestination(
              filesystem=pusher_pb2.PushDestination.Filesystem(
                  base_directory=serving_model_dir2)),
  }

  pusher2 = Pusher(**pusher_args2)  # pylint: disable=unused-variable
  components.append(pusher2)

person LLTeng    schedule 20.05.2020    source источник


Ответы (1)


Вышеупомянутая проблема решена путем добавления «instance_name» в каждый компонент конвейера для определения уникального имени.

person LLTeng    schedule 28.05.2020