Унифицированный аналитический инструмент для приема, вычисления или обработки данных, хранения данных, расширенной аналитики или машинного обучения и отображения всего в одном инструменте. Платформа сквозной аналитики данных, созданная для масштабирования и простоты использования.

Расширенная аналитика Syanpse

Synapse может запускать код на основе искры, что приводит к разработке данных или разработке функций, а также к машинному обучению. В этой статье описывается, как обучить модель машинного обучения с помощью искры в синапсе.

Предпосылка

Загрузите приведенный выше обучающий файл в хранилище BLOB-объектов или в ADLS Gen2. Или вы можете использовать функцию оркестровки синапсов, чтобы переместить данные в большой двоичный объект.

For my testing i was able to move the blob storage train.csv into ADLS gen2 filesystem. I did that for just to show how to move data inside synapse analytics.

Сначала давайте подключимся к хранилищу данных, чтобы получить данные

Это нужно для запуска искровой версии.

%%pyspark import pyspark 
print(print(pyspark.__version__))

Теперь давайте настроим хранилище BLOB-объектов для получения данных:

spark.conf.set( "fs.azure.account.key.waginput.blob.core.windows.net", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")

Считайте файл csv во фрейм данных

val df = spark.read.option("header","true").option("inferSchema","true").csv("wasbs://[email protected]/train.csv")

Распечатайте схему:

df.printSchema

Установите список функций для моделирования машинного обучения:

import org.apache.spark.ml.feature.VectorAssembler 
import org.apache.spark.ml.linalg.Vectors 
val featureCols=Array("fare_amount","pickup_longitude","pickup_latitude","dropoff_longitude","dropoff_latitude","passenger_count") 
val assembler: org.apache.spark.ml.feature.VectorAssembler= new VectorAssembler().setInputCols(featureCols).setOutputCol("features") val assembledDF = assembler.setHandleInvalid("skip").transform(df) val assembledFinalDF = assembledDF.select("fare_amount","features")

Нормализуйте фрейм данных:

import org.apache.spark.ml.feature.Normalizer 
val normalizedDF = new Normalizer().setInputCol("features").setOutputCol("normalizedFeatures").transform(assembledFinalDF)

Отбросьте недостающую точку данных во фрейме данных:

val normalizedDF1 = normalizedDF.na.drop()

Разделить данные для обучения и тестирования

val Array(trainingData, testData) = normalizedDF1.randomSplit(Array(0.7, 0.3))

включает в себя для моделирования

import org.apache.spark.ml.Pipeline 
import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier} 
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
// Index labels, adding metadata to the label column. 
// Fit on whole dataset to include all labels in index. 
val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(normalizedDF1)
// Automatically identify categorical features, and index them. 
// Set maxCategories so features with > 4 distinct values are treated as continuous. 
val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(normalizedDF1)

Установите параметры модели GBT и входные данные

// Train a GBT model. val gbt = new GBTClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10)

Преобразовать ярлык обратно

// Convert indexed labels back to original labels. val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)

Создайте конвейер для запуска обучения модели

// Chain indexers and GBT in a Pipeline. 
val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))
// Train model. This also runs the indexers. 
val model = pipeline.fit(trainingData)
// Make predictions. 
val predictions = model.transform(testData)

Распечатать 5 лучших прогнозов

// Select example rows to display. predictions.select("predictedLabel", "label", "features").show(5)

запустить оценщик.

// Select (prediction, true label) and compute test error. val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy") 
val accuracy = evaluator.evaluate(predictions) 
println("Test Error = " + (1.0 - accuracy)) 
val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel] println("Learned classification GBT model:\n" + gbtModel.toDebugString)

Первоначально опубликовано на https://github.com.