Перемещение данных, обработка данных, обучение модели с помощью Azure Synapse Analytics Workspace
Унифицированная единая платформа для управления жизненным циклом Data и Data Science для повышения производительности
Примечание
- Эта статья предназначена для демонстрации функциональности
Предпосылка
- Учетная запись Azure
- Рабочая область Azure Synapse Analytics
- Рабочая область машинного обучения Azure
- База данных SQL Azure
- Хранилище Azure
- Загрузите данные Covid19 из kaggel и импортируйте в Azure SQL в качестве смоделированных данных из внешнего источника.
- Создайте одну таблицу как dbo.covid19data и загрузите данные
- Приведенный выше поток архитектуры состоит из 3 компонентов.
- Копировать действие для перемещения данных из внешнего источника и переноса в необработанную или входную зону.
- Далее следует действие потока данных для перетаскивания ETL/ELT для преобразования или обработки данных и перехода к окончательному варианту.
- Блокнот для запуска автоматизированного ML sdk с использованием spark для запуска модели машинного обучения
- Мы используем регрессию для прогнозирования смертей, происходящих из-за covid 19.
- Войдите в рабочую область Azure Synapse.
- Создать конвейер/интегрировать
- Перетаскивание копирования
- для источника выберите указанный выше SQL-сервер — (необходимо создать связанные службы)
- Затем создайте поток данных для обработки данных.
- Для примера я использую дельту, чтобы упростить CDC.
- Создайте источник с назначением действий копирования
- Мы можем возобновить связанную службу назначения действия по копированию самостоятельно.
- Теперь перетащите выделение
- Выбрать все столбцы
- Включите отладку и просмотрите результаты. Этот набор данных с открытым исходным кодом, поэтому нет PII
- Выберите столбцы
- См. отладку для проверки обработки данных.
- Теперь к машинному обучению
- Создать блокнот
- Получите доступ к вышеуказанному набору данных стока в качестве входных данных для моделирования.
- Создать блокнот
Код
- Загрузить данные
%%pyspark
df = spark.read.load('abfss://[email protected]/covid19aggroutput/*.parquet', format='parquet')
display(df.limit(10))
- Схема печати
df.printSchema
- импорт необходимо
from pyspark.sql.functions import *
from pyspark.sql import *
- преобразовать дату в тип данных даты
df1 = df.withColumn("Date", to_date("ObservationDate", "MM/dd/yyyy"))
display(df1)
- Создать новые столбцы
df2 = df1.withColumn("year", year(col("Date"))).withColumn("month", month(col("Date"))).withColumn("day", dayofmonth(col("Date")))
- Далее идет импорт библиотеки ML.
import azureml.core
from azureml.core import Experiment, Workspace, Dataset, Datastore from azureml.train.automl import AutoMLConfig from azureml.data.dataset_factory import TabularDatasetFactory
- Настройка рабочей области машинного обучения Azure.
- Установите конфигурацию для доступа к автоматизированному машинному обучению.
subscription_id = "xxxxxxxx" resource_group = "xxxxxx" workspace_name = "workspacename" experiment_name = "nameofsynapse-covid19aggr-20210913"
ws = Workspace(subscription_id = subscription_id, resource_group = resource_group, workspace_name = workspace_name) experiment = Experiment(ws, experiment_name)
- получить только необходимые столбцы
dffinal = df2[["year","month", "day", "Confirmed", "Deaths", "Recovered"]]
- Зарегистрируйте хранилище данных
datastore = Datastore.get_default(ws)
dataset = TabularDatasetFactory.register_spark_dataframe(df, datastore, name = experiment_name + "-dataset")
- Настройка автоматизированного мл
automl_config = AutoMLConfig(spark_context = sc,
task = "regression",
training_data = dataset,
label_column_name = "Deaths",
primary_metric = "spearman_correlation",
experiment_timeout_hours = 1,
max_concurrent_iterations = 4,
enable_onnx_compatible_models = False)
- Запустить эксперимент
run = experiment.submit(automl_config)
- Дождитесь завершения
run.wait_for_completion(show_output=False)
- Показать вывод
displayHTML("<a href={} target='_blank'>Your experiment in Azure Machine Learning portal: {}</a>".format(run.get_portal_url(), run.id))
- запустить эксперимент и задокументировать результат
run.wait_for_completion()
# Install required dependency import pip pip.main(["install", "azure-storage-blob==12.5.0"])
import mlflow
# Get best model from automl run best_run, non_onnx_model = run.get_output()
artifact_path = experiment_name + "_artifact"
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri()) mlflow.set_experiment(experiment_name)
with mlflow.start_run() as run: # Save the model to the outputs directory for capture mlflow.sklearn.log_model(non_onnx_model, artifact_path)
# Register the model to AML model registry mlflow.register_model("runs:/" + run.info.run_id + "/" + artifact_path, "bbaccsynapse-covid19-20201217032226-Best")
- Сохраните конвейер и запустите отладку
Первоначально опубликовано на https://github.com.