Привет всем, последние несколько месяцев я работал над масштабируемостью и производством алгоритмов машинного обучения. Я много искал в Интернете и получил очень мало поддержки. Компании все еще пытаются добиться большего успеха в этом сегменте. Согласно опросу, только 4% моделей машинного обучения предназначены для развертывания и производственной среды, это связано с меньшей поддержкой сообщества в этом направлении. Давайте начнем с сегодняшней темы и немного внесем свой вклад в сообщество.
Прежде чем продолжить, нужно знать кое-что: -
- Почему Pyspark и блокнот Databricks?
- Что такое колба и какие альтернативы?
Я использовал pyspark и блокнот Databricks, так как он также хорошо определен для отображения фрейма данных искры и графиков. Databricks также обеспечивает настройку кластера, поэтому мы можем использовать любую конфигурацию машины в кластере для повышения вычислительной мощности. В нашем случае я использовал (3 машины 42GB Ram). Вы можете проверить [здесь] (https://databricks.com/) для получения дополнительной информации и использования.
Здесь для развертывания используется Flask. Flask - это веб-сервер для Python, который помогает создать сервер, который предоставляет конечную точку для обслуживания запроса. Необязательно использовать фляжку, на рынке есть много альтернатив. Позже мы обсудим больше об этом и о том, как использовать и т. Д. Для получения дополнительной информации проверьте (это) [http://flask.pocoo.org/]
Я выбираю очень простой набор данных, здесь используется набор данных Titanic, но любой может играть и с другим набором данных. Некоторые из основных операций предварительной обработки, выполняемых здесь, такие как извлечение признаков, вменение, отбрасывание и т. Д. Я покажу каждый шаг в функциональном виде.
Шаг 1. - Импортируйте всю важную библиотеку
""" Loading important package of spark """ from pyspark.sql import SparkSession from pyspark.ml import Pipeline,PipelineModel from pyspark.sql.functions import * from pyspark.ml.pipeline import Transformer,Estimator from pyspark.ml.feature import StringIndexer,VectorAssembler from pyspark.ml.classification import LogisticRegression from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
Здесь мы используем pipeline
, который в основном работает на таких этапах, как последовательная операция. Позже в коде вы увидите, как я использую конвейер для _2 _, _ 3_ и algorithm
другую импортированную библиотеку, пожалуйста, проверьте на официальном сайте Spark.
Шаг 2. Создайте сеанс Spark
""" Spark session creater """ st = SparkSession \ .builder \ .appName('Titanic') \ .getOrCreate()
Вы также можете установить множество пользовательских опций памяти для этого сеанса, чтобы не усложнять, я использую конфигурацию по умолчанию.
Шаг -3: загрузка набора данных в Spark DataFrame
""" Load data function for loading data.. @param - path - path of file header_value - header value, incase true first row will be header @return - dataframe of loaded intended data. """ def load_data(path,header_value): df = st.read.csv(path,inferSchema=True,header=header_value) return df df = load_data('/FileStore/tables/titanic_train.csv',True) df_test = load_data('/FileStore/tables/titanic_test.csv',True)
Загрузить файл данных, в этом случае файл обучающих и тестовых данных разделен. Для удобства я создал функцию, которая будет вызывать функцию load_data
каждый раз, когда требуется загрузить данные.
Шаг -4. Создайте собственный преобразователь для предварительной обработки данных
''' Custom Transformer class for tranformation implementation . @param - Transformer - Transformer class refrence df - dataframe in which operation need to be carried ( passed through tranform function) A - A class for variable sharing. @return - df - a dataframe which contains prediction value as well with featured value. ''' class preprocess_transform(Transformer): def _transform(self,df): print("******************************** in Transform method ...************************************") """ Generate feature column in dataframe based on specific logic @param - df - dataframe for operation. @return - df - dataframe with generated feature. """ def feature_generation(self,df): df = df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1)) df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'], ['Miss','Miss','Miss','Mr','Mr', 'Mrs', 'Mrs', 'Other', 'Other','Other','Mr','Mr','Mr']) df = df.withColumn("Family_Size",col('SibSp')+col('Parch')) df = df.withColumn('Alone',lit(0)) df = df.withColumn("Alone",when(df["Family_Size"] ==0, 1).otherwise(df["Alone"])) return df """ Impute Age based on Age mean of specific gender. ex for male mean is 46 update all null male row with 46, similarly for others @param - df - dataframe for operation @return - df - with imputed value """ def Age_impute(self,df): Age_mean = df.groupBy("Initial").avg('Age') Age_mean = Age_mean.withColumnRenamed('avg(Age)','mean_age') Initials_list = Age_mean.select("Initial").rdd.flatMap(lambda x: x).collect() Mean_list = Age_mean.select("mean_age").rdd.flatMap(lambda x: x).collect() for i,j in zip(Initials_list,Mean_list): df = df.withColumn("Age",when((df["Initial"] == i) & (df["Age"].isNull()), j).otherwise(df["Age"])) return df """ Impute Embark based on mode of embark column @param - df - dataframe for operation @return - df - with imputed value """ def Embark_impute(self,df): mode_value = df.groupBy('Embarked').count().sort(col('count').desc()).collect()[0][0] df = df.fillna({'Embarked':mode_value}) return df """ Impute Fare based on the class which he/she had sat ex: class 3rd has mean fare 9 and null fare belong to 3rd class so fill 9 @param - df - dataframe for operation @return - df - with imputed value """ def Fare_impute(self,df): Select_pclass = df.filter(col('Fare').isNull()).select('Pclass') if Select_pclass.count() > 0: Pclass = Select_pclass.rdd.flatMap(lambda x: x).collect() for i in Pclass: mean_pclass_fare = df.groupBy('Pclass').mean().select('Pclass','avg(Fare)').filter(col('Pclass')== i).collect()[0][1] df = df.withColumn("Fare",when((col('Fare').isNull()) & (col('Pclass') == i),mean_pclass_fare).otherwise(col('Fare'))) return df ''' combining all column imputation together.. @param - df - a dataframe for operation. @return - df - dataframe with imputed value. ''' def all_impute_together(df): df = Age_impute(self,df) df = Embark_impute(self,df) df = Fare_impute(self,df) return df ''' converting string to numeric values. @param - df - dataframe contained all columns. col_list - list of column need to be @return - df - transformed dataframe. ''' def stringToNumeric_conv(df,col_list): indexer = [StringIndexer(inputCol=column,outputCol=column+"_index").fit(df) for column in col_list] string_change_pipeline = Pipeline(stages=indexer) df = string_change_pipeline.fit(df).transform(df) return df """ Drop column from dataframe @param - df - dataframe col_name - name of column which need to be dropped. @return - df - a dataframe except dropped column """ def drop_column(df,col_list): for i in col_list: df = df.drop(col(i)) return df col_list = ["Sex","Embarked","Initial"] dataset = feature_generation(self,df) df_impute = all_impute_together(dataset) df_numeric = stringToNumeric_conv(df_impute,col_list) df_final = drop_column(df_numeric,['Cabin','Name','Ticket','Family_Size','SibSp','Parch','Sex','Embarked','Initial']) return df_final
В классе Transformer представлены различные методы, каждый для разных операций.
Feature_generation () - создать заголовок из столбца имени.
Age_impute () - условно вычислить возраст на основе среднего значения возрастной группы.
Embark_impute и Fare_impute - Impute Посадка и тариф
StringToNumeric () - преобразовать тип данных String в числовой
Drop_col - Удалить ненужные столбцы из фрейма данных
Шаг -5 Создание конвейера и извлечение модели
from pyspark.ml.classification import GBTClassifier from pyspark.ml.classification import RandomForestClassifier from pyspark.ml.evaluation import MulticlassClassificationEvaluator # initialization for pipeline setup my_model = preprocess_transform() df = my_model.transform(df) feature = VectorAssembler(inputCols=['Pclass','Age','Fare','Alone','Sex_index','Embarked_index','Initial_index'],outputCol="features") rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10) ''' pipeline stages initilization , fit and transform. ''' pipeline = Pipeline(stages=[feature,rf]) model = pipeline.fit(df) paramGrid = ParamGridBuilder().addGrid(rf.numTrees,[100,300]).build() evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy") crossval = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=evaluator,numFolds=3) # use 3+ folds in practice # Run cross-validation, and choose the best set of parameters. cvModel = crossval.fit(df) prediction = cvModel.transform(df_test) mlflow.spark.log_model(model, "spark-model16") mlflow.spark.save_model(model, "spark-model_test")
На этом этапе сначала мы вызываем класс трансформатора и преобразуем наш фрейм данных с помощью созданного выше варианта.
После преобразования вызовите FeatureAssembler, который используется для привязки всех входных функций к одному вектору. Создайте объект конвейера, пройдя два этапа: первый - это ассемблер функций, а второй - оценщик (классификатор или регрессор).
Если вы хотите, вы можете создать перекрестную проверку, используя список гиперпараметров, paramGridBuilder использовать для назначения различных гиперпараметров с использованием списка значений.
Установите Evaluator для измерения таких критериев, как MulticlassClassificationEvaluator
Вызвать кросс-валидатор и передать модель конвейера, сетку параметров, вычислитель и количество сверток.
Как только все это будет сделано, поместите данные в функцию кросс-валидатора, помните, что она содержит конвейер, у которого есть модель. Таким образом, он будет обучать модель с различными комбинациями параметров и, наконец, использовать метод transform
для получения прогнозируемых данных.
MLFlow - это платформа, которая управляет циклом машинного обучения. После предсказания мы можем использовать две функции потока мл: log
и save
. Функция журнала будет регистрировать метрики обработки на портале ML FLow, а функция Сохранить сохранит лучшую модель ml. ML-поток имеет много других полезных функций, поэтому просто ознакомьтесь с их официальной документацией.
Шаг 6: развертывание в Azure с использованием Azure ML
import mlflow.azureml from azureml.core import Workspace from azureml.core.webservice import AciWebservice, Webservice
Создайте рабочую область в Azure ML.
workspace_name = "MLServiceDMWS11" subscription_id = "xxxxxxxx-23ad-4272-xxxx-0d504b07d497" resource_group = "mlservice_ws" location = "xxx" azure_workspace = Workspace.create(name=workspace_name, subscription_id=subscription_id, resource_group=resource_group, location=location, create_resource_group=False, )
Создание образа модели в рабочей области, по сути, означает, что мы просто сохраняем объект модели в рабочей области.
azure_image, azure_model = mlflow.azureml.build_image(model_uri="/dbfs/databricks/mlflow/my_test_ml_flow", workspace=azure_workspace, description="model_description", synchronous=True)
При использовании API веб-службы Azure модель будет отображаться как конечная точка отдыха. Передача изображения модели, рабочего пространства и настройки конфигурации.
webservice_deployment_config = AciWebservice.deploy_configuration() webservice = Webservice.deploy_from_image(deployment_config=webservice_deployment_config, image=azure_image, workspace=azure_workspace, name='mysvc') webservice.wait_for_deployment() print("Scoring URI is: %s", webservice.scoring_uri)
После развертывания модели давайте просто проверим, работает ли созданный нами API. Передайте параметр и соответствующее значение в списке и отправьте почтовый запрос. Как только запрос будет успешным, он подтвердит ответ. Стандартный способ приема i / p и o / p - в формате json.
import requests import json sample_input = { "columns": [ "col1", "col2", "col3", "coln " ], "data": [ [val1, val2, val3,...... valn] ] } response = requests.post( url=webservice.scoring_uri, data=json.dumps(sample_input), headers={"Content-type": "application/json"}) response_json = json.loads(response.text) print(response_json)
Это только начальный набросок этой статьи, я обновлю его в ближайшем будущем. Вы можете проверить весь код по моей ссылке git ниже. Всегда признателен, если у вас есть отзывы или предложения.
Спасибо за поддержку! Скоро увидимся в следующий раз :)