Перемещение данных, обработка данных, обучение модели с помощью Azure Synapse Analytics Workspace

Унифицированная единая платформа для управления жизненным циклом Data и Data Science для повышения производительности

Примечание

  • Эта статья предназначена для демонстрации функциональности

Предпосылка

  • Учетная запись Azure
  • Рабочая область Azure Synapse Analytics
  • Рабочая область машинного обучения Azure
  • База данных SQL Azure
  • Хранилище Azure
  • Загрузите данные Covid19 из kaggel и импортируйте в Azure SQL в качестве смоделированных данных из внешнего источника.
  • Создайте одну таблицу как dbo.covid19data и загрузите данные

  • Приведенный выше поток архитектуры состоит из 3 компонентов.
  • Копировать действие для перемещения данных из внешнего источника и переноса в необработанную или входную зону.
  • Далее следует действие потока данных для перетаскивания ETL/ELT для преобразования или обработки данных и перехода к окончательному варианту.
  • Блокнот для запуска автоматизированного ML sdk с использованием spark для запуска модели машинного обучения
  • Мы используем регрессию для прогнозирования смертей, происходящих из-за covid 19.
  • Войдите в рабочую область Azure Synapse.
  • Создать конвейер/интегрировать
  • Перетаскивание копирования
  • для источника выберите указанный выше SQL-сервер — (необходимо создать связанные службы)

  • Затем создайте поток данных для обработки данных.
  • Для примера я использую дельту, чтобы упростить CDC.
  • Создайте источник с назначением действий копирования

  • Мы можем возобновить связанную службу назначения действия по копированию самостоятельно.
  • Теперь перетащите выделение

  • Выбрать все столбцы
  • Включите отладку и просмотрите результаты. Этот набор данных с открытым исходным кодом, поэтому нет PII

  • Выберите столбцы
  • См. отладку для проверки обработки данных.

  • Теперь к машинному обучению
  • Создать блокнот
  • Получите доступ к вышеуказанному набору данных стока в качестве входных данных для моделирования.
  • Создать блокнот

Код

  • Загрузить данные
%%pyspark
df = spark.read.load('abfss://[email protected]/covid19aggroutput/*.parquet', format='parquet')
display(df.limit(10))

  • Схема печати
df.printSchema
  • импорт необходимо
from pyspark.sql.functions import *
from pyspark.sql import *
  • преобразовать дату в тип данных даты
df1 = df.withColumn("Date", to_date("ObservationDate", "MM/dd/yyyy")) 
display(df1)

  • Создать новые столбцы
df2 = df1.withColumn("year", year(col("Date"))).withColumn("month", month(col("Date"))).withColumn("day", dayofmonth(col("Date")))

  • Далее идет импорт библиотеки ML.
import azureml.core
from azureml.core import Experiment, Workspace, Dataset, Datastore
from azureml.train.automl import AutoMLConfig
from azureml.data.dataset_factory import TabularDatasetFactory
  • Настройка рабочей области машинного обучения Azure.
  • Установите конфигурацию для доступа к автоматизированному машинному обучению.
subscription_id = "xxxxxxxx"
resource_group = "xxxxxx"
workspace_name = "workspacename"
experiment_name = "nameofsynapse-covid19aggr-20210913"
ws = Workspace(subscription_id = subscription_id, resource_group = resource_group, workspace_name = workspace_name)
experiment = Experiment(ws, experiment_name)
  • получить только необходимые столбцы
dffinal = df2[["year","month", "day", "Confirmed", "Deaths", "Recovered"]]
  • Зарегистрируйте хранилище данных
datastore = Datastore.get_default(ws)
dataset = TabularDatasetFactory.register_spark_dataframe(df, datastore, name = experiment_name + "-dataset")
  • Настройка автоматизированного мл
automl_config = AutoMLConfig(spark_context = sc,
                             task = "regression",
                             training_data = dataset,
                             label_column_name = "Deaths",
                             primary_metric = "spearman_correlation",
                             experiment_timeout_hours = 1,
                             max_concurrent_iterations = 4,
                             enable_onnx_compatible_models = False)
  • Запустить эксперимент
run = experiment.submit(automl_config)
  • Дождитесь завершения
run.wait_for_completion(show_output=False)
  • Показать вывод
displayHTML("<a href={} target='_blank'>Your experiment in Azure Machine Learning portal: {}</a>".format(run.get_portal_url(), run.id))
  • запустить эксперимент и задокументировать результат
run.wait_for_completion()
# Install required dependency
import pip
pip.main(["install", "azure-storage-blob==12.5.0"])
import mlflow
# Get best model from automl run
best_run, non_onnx_model = run.get_output()
artifact_path = experiment_name + "_artifact"
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
mlflow.set_experiment(experiment_name)
with mlflow.start_run() as run:
    # Save the model to the outputs directory for capture
    mlflow.sklearn.log_model(non_onnx_model, artifact_path)
    # Register the model to AML model registry
    mlflow.register_model("runs:/" + run.info.run_id + "/" + artifact_path, "bbaccsynapse-covid19-20201217032226-Best")
  • Сохраните конвейер и запустите отладку

Первоначально опубликовано на https://github.com.