Унифицированный аналитический инструмент для приема, вычисления или обработки данных, хранения данных, расширенной аналитики или машинного обучения и отображения всего в одном инструменте. Платформа сквозной аналитики данных, созданная для масштабирования и простоты использования.
Расширенная аналитика Syanpse
Synapse может запускать код на основе искры, что приводит к разработке данных или разработке функций, а также к машинному обучению. В этой статье описывается, как обучить модель машинного обучения с помощью искры в синапсе.
Предпосылка
Загрузите приведенный выше обучающий файл в хранилище BLOB-объектов или в ADLS Gen2. Или вы можете использовать функцию оркестровки синапсов, чтобы переместить данные в большой двоичный объект.
For my testing i was able to move the blob storage train.csv into ADLS gen2 filesystem. I did that for just to show how to move data inside synapse analytics.
Сначала давайте подключимся к хранилищу данных, чтобы получить данные
Это нужно для запуска искровой версии.
%%pyspark import pyspark
print(print(pyspark.__version__))
Теперь давайте настроим хранилище BLOB-объектов для получения данных:
spark.conf.set( "fs.azure.account.key.waginput.blob.core.windows.net", "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")
Считайте файл csv во фрейм данных
val df = spark.read.option("header","true").option("inferSchema","true").csv("wasbs://[email protected]/train.csv")
Распечатайте схему:
df.printSchema
Установите список функций для моделирования машинного обучения:
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.linalg.Vectors
val featureCols=Array("fare_amount","pickup_longitude","pickup_latitude","dropoff_longitude","dropoff_latitude","passenger_count")
val assembler: org.apache.spark.ml.feature.VectorAssembler= new VectorAssembler().setInputCols(featureCols).setOutputCol("features") val assembledDF = assembler.setHandleInvalid("skip").transform(df) val assembledFinalDF = assembledDF.select("fare_amount","features")
Нормализуйте фрейм данных:
import org.apache.spark.ml.feature.Normalizer
val normalizedDF = new Normalizer().setInputCol("features").setOutputCol("normalizedFeatures").transform(assembledFinalDF)
Отбросьте недостающую точку данных во фрейме данных:
val normalizedDF1 = normalizedDF.na.drop()
Разделить данные для обучения и тестирования
val Array(trainingData, testData) = normalizedDF1.randomSplit(Array(0.7, 0.3))
включает в себя для моделирования
import org.apache.spark.ml.Pipeline import org.apache.spark.ml.classification.{GBTClassificationModel, GBTClassifier} import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
// Index labels, adding metadata to the label column. // Fit on whole dataset to include all labels in index. val labelIndexer = new StringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(normalizedDF1)
// Automatically identify categorical features, and index them. // Set maxCategories so features with > 4 distinct values are treated as continuous. val featureIndexer = new VectorIndexer().setInputCol("features").setOutputCol("indexedFeatures").setMaxCategories(4).fit(normalizedDF1)
Установите параметры модели GBT и входные данные
// Train a GBT model. val gbt = new GBTClassifier().setLabelCol("indexedLabel").setFeaturesCol("indexedFeatures").setMaxIter(10)
Преобразовать ярлык обратно
// Convert indexed labels back to original labels. val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)
Создайте конвейер для запуска обучения модели
// Chain indexers and GBT in a Pipeline. val pipeline = new Pipeline().setStages(Array(labelIndexer, featureIndexer, gbt, labelConverter))
// Train model. This also runs the indexers. val model = pipeline.fit(trainingData)
// Make predictions. val predictions = model.transform(testData)
Распечатать 5 лучших прогнозов
// Select example rows to display. predictions.select("predictedLabel", "label", "features").show(5)
запустить оценщик.
// Select (prediction, true label) and compute test error. val evaluator = new MulticlassClassificationEvaluator().setLabelCol("indexedLabel").setPredictionCol("prediction").setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))
val gbtModel = model.stages(2).asInstanceOf[GBTClassificationModel] println("Learned classification GBT model:\n" + gbtModel.toDebugString)
Первоначально опубликовано на https://github.com.