Блок-схема, чтобы показать процесс

Пример кода ниже. импорт, необходимый для работы кода.

import os 
import urllib 
import shutil 
import azureml 
import pandas as pd 
from azureml.core import Experiment 
from azureml.core import Workspace, Run 
from azureml.core.compute import ComputeTarget, AmlCompute 
from azureml.core.compute_target import ComputeTargetException

Давайте настроим рабочую область, чтобы ноутбуки знали, какую рабочую область использовать.

ws = Workspace.from_config()

Установите необходимые папки проекта. Используется временная папка.

project_folder = './test-project' os.makedirs(project_folder, exist_ok=True) experiment = Experiment(workspace=ws, name='test-model')
output_folder = './outputs' os.makedirs(output_folder, exist_ok=True)
result_folder = './results' os.makedirs(result_folder, exist_ok=True)

Установите кластер, который модель будет использовать для оценки. Создайте новый кластер или используйте существующий кластер.

from azureml.core.compute import ComputeTarget, AmlCompute 
from azureml.core.compute_target import ComputeTargetException 
# Choose a name for your CPU cluster 
cpu_cluster_name = "testcluster" 
# Verify that cluster does not exist already 
try: 
    cpu_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
    print('Found existing cluster, use it.') 
except ComputeTargetException: 
    compute_config= AmlCompute.provisioning_configuration(vm_size='STANDARD_D14_V2', max_nodes=4) 
   cpu_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)
   cpu_cluster.wait_for_completion(show_output=True)

Выберите образ по умолчанию и зависимости для работы вышеуказанного кластера.

from azureml.core.runconfig import RunConfiguration 
from azureml.core.conda_dependencies import CondaDependencies 
from azureml.core.runconfig import DEFAULT_CPU_IMAGE 
# Create a new runconfig object 
run_amlcompute = RunConfiguration() 
# Use the cpu_cluster you created above. 
run_amlcompute.target = cpu_cluster 
# Enable Docker 
run_amlcompute.environment.docker.enabled = True 
# Set Docker base image to the default CPU-based image run_amlcompute.environment.docker.base_image = DEFAULT_CPU_IMAGE 
# Use conda_dependencies.yml to create a conda environment in the Docker image for execution run_amlcompute.environment.python.user_managed_dependencies = False # Specify CondaDependencies obj, add necessary packages run_amlcompute.environment.python.conda_dependencies = CondaDependencies.create(conda_packages=['scikit-learn'])

Напишите файл пакетной оценки — здесь логика того, как использовать модель с новыми данными и оценивать входные данные для прогнозирования.

Процесс пакетного скорингового файла заключается в загрузке всех необходимых библиотек. Затем загрузите модель, которую мы собираемся использовать.

После загрузки модели основная функция запуска будет принимать каждый файл в качестве параметра и оценки. Каждый файл может быть парализован одним узлом для горизонтального масштабирования. Функция запуска открывает каждый файл, затем прогнозирует и сохраняет вывод.

%%writefile batch_scoring.py 
import io 
import pickle 
import argparse 
import numpy as np 
import pandas as pd 
import joblib 
import os 
import urllib 
import shutil 
import azureml
 
from azureml.core.model import Model 
from sklearn.linear_model import LogisticRegression 
from sklearn.ensemble import RandomForestClassifier 
def init():
    global test_model

    model_path = Model.get_model_path("sklearn_test)

    #model_path = Model.get_model_path(args.model_name)
    with open(model_path, 'rb') as model_file:
        test_model = joblib.load(model_file)


def run(mini_batch):
    # make inference    
    print(f'run method start: {__file__}, run({mini_batch})')
    resultList = []
    for file in mini_batch:
        input_data = pd.read_parquet(file, engine='pyarrow')
        num_rows, num_cols = input_data.shape
        X = input_data[[col for col in input_data.columns if "encoding" in col]]
        y = input_data['test_flag']

        X = X[[col for col in X.columns if col not in ["xxxx_premium_range_encoding", "xyz_encoding"]]]
        pred = test_model.predict(X).reshape((num_rows, 1))

    # cleanup output
    #result = input_data.drop(input_data.columns[4:], axis=1)
    result = X
    result['variety'] = pred

    return result

Время для установки источника входных данных, который будет использоваться для оценки.

from azureml.core.datastore import Datastore

batchscore_blob = Datastore.register_azure_blob_container(ws, 
                      datastore_name="inputdatastore", 
                      container_name="containername", 
                      account_name="storageaccoutnname",
                      account_key="xxxxx",
                      overwrite=True)

def_data_store = ws.get_default_datastore()

Источник выходных данных, где хранить выходной файл с оценками.

from azureml.core.datastore import Datastore batchscore_blob_out = Datastore.register_azure_blob_container(ws, datastore_name="input_datastore", container_name="containername", account_name="storageaccountname", account_key="xxxxxxx", overwrite=True) def_data_store_out = ws.get_default_datastore()

Настройте конвейер с наборами данных.

from azureml.core.datastore import Datastore

batchscore_blob_out = Datastore.register_azure_blob_container(ws, 
                      datastore_name="input_datastore", 
                      container_name="containername", 
                      account_name="storageaccountname", 
                      account_key="xxxxxxx",
                      overwrite=True)

def_data_store_out = ws.get_default_datastore()
from azureml.core.dataset import Dataset
from azureml.pipeline.core import PipelineData

input_ds = Dataset.File.from_files((batchscore_blob, "/"))
output_dir = PipelineData(name="scores", 
                          datastore=def_data_store_out, 
                          output_path_on_compute="results")
#Set Environments
from azureml.core import Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.runconfig import DEFAULT_CPU_IMAGE

cd = CondaDependencies.create(pip_packages=["scikit-learn", "azureml-defaults","pyarrow"])
env = Environment(name="parallelenv")
env.python.conda_dependencies = cd
env.docker.base_image = DEFAULT_CPU_IMAGE

Настройка параметров конвейера

from azureml.pipeline.core import PipelineParameter
from azureml.pipeline.steps import ParallelRunConfig

parallel_run_config = ParallelRunConfig(
    #source_directory=scripts_folder,
    entry_script="batch_scoring.py",
    mini_batch_size='1',
    error_threshold=2,
    output_action='append_row',
    append_row_file_name="test_outputs.txt",
    environment=env,
    compute_target=cpu_cluster, 
    node_count=3,
    run_invocation_timeout=120)

Настройка шагов конвейера

from azureml.pipeline.steps import ParallelRunStep

batch_score_step = ParallelRunStep(
    name="parallel-step-test",
    inputs=[input_ds.as_named_input("input_ds")],
    output=output_dir,
    #models=[model],
    parallel_run_config=parallel_run_config,
    #arguments=['--model_name', 'sklearn_touring'],
    allow_reuse=True
)

Выполнить трубопровод.

from azureml.core import Experiment
from azureml.pipeline.core import Pipeline

pipeline = Pipeline(workspace=ws, steps=[batch_score_step])
pipeline_run = Experiment(ws, 'batch_scoring').submit(pipeline)
pipeline_run.wait_for_completion(show_output=True)

Показать статус запуска эксперимента

from azureml.widgets import RunDetails
RunDetails(pipeline_run).show()

Конец работы конвейера.

Для получения вывода и отладки щелкните ссылку https://docs.microsoft.com/en-us/azure/machine-learning/how-to-debug-parallel-run-step.

Первоначально опубликовано на https://github.com.