Привет всем, последние несколько месяцев я работал над масштабируемостью и производством алгоритмов машинного обучения. Я много искал в Интернете и получил очень мало поддержки. Компании все еще пытаются добиться большего успеха в этом сегменте. Согласно опросу, только 4% моделей машинного обучения предназначены для развертывания и производственной среды, это связано с меньшей поддержкой сообщества в этом направлении. Давайте начнем с сегодняшней темы и немного внесем свой вклад в сообщество.

Прежде чем продолжить, нужно знать кое-что: -

  1. Почему Pyspark и блокнот Databricks?
  2. Что такое колба и какие альтернативы?

Я использовал pyspark и блокнот Databricks, так как он также хорошо определен для отображения фрейма данных искры и графиков. Databricks также обеспечивает настройку кластера, поэтому мы можем использовать любую конфигурацию машины в кластере для повышения вычислительной мощности. В нашем случае я использовал (3 машины 42GB Ram). Вы можете проверить [здесь] (https://databricks.com/) для получения дополнительной информации и использования.

Здесь для развертывания используется Flask. Flask - это веб-сервер для Python, который помогает создать сервер, который предоставляет конечную точку для обслуживания запроса. Необязательно использовать фляжку, на рынке есть много альтернатив. Позже мы обсудим больше об этом и о том, как использовать и т. Д. Для получения дополнительной информации проверьте (это) [http://flask.pocoo.org/]

Я выбираю очень простой набор данных, здесь используется набор данных Titanic, но любой может играть и с другим набором данных. Некоторые из основных операций предварительной обработки, выполняемых здесь, такие как извлечение признаков, вменение, отбрасывание и т. Д. Я покажу каждый шаг в функциональном виде.

Шаг 1. - Импортируйте всю важную библиотеку

"""
Loading important package of spark 
"""
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline,PipelineModel
from pyspark.sql.functions import *
from pyspark.ml.pipeline import Transformer,Estimator
from pyspark.ml.feature import StringIndexer,VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

Здесь мы используем pipeline, который в основном работает на таких этапах, как последовательная операция. Позже в коде вы увидите, как я использую конвейер для _2 _, _ 3_ и algorithm

другую импортированную библиотеку, пожалуйста, проверьте на официальном сайте Spark.

Шаг 2. Создайте сеанс Spark

"""
Spark session creater 
"""

st = SparkSession \
        .builder \
        .appName('Titanic') \
        .getOrCreate()

Вы также можете установить множество пользовательских опций памяти для этого сеанса, чтобы не усложнять, я использую конфигурацию по умолчанию.

Шаг -3: загрузка набора данных в Spark DataFrame

"""
Load data function for loading data..
@param - 
        path - path of file
        header_value - header value, incase true first row will be header
        
@return - dataframe of loaded intended data.
"""

def load_data(path,header_value):
  df = st.read.csv(path,inferSchema=True,header=header_value)
  return df
df = load_data('/FileStore/tables/titanic_train.csv',True) 
df_test = load_data('/FileStore/tables/titanic_test.csv',True)

Загрузить файл данных, в этом случае файл обучающих и тестовых данных разделен. Для удобства я создал функцию, которая будет вызывать функцию load_data каждый раз, когда требуется загрузить данные.

Шаг -4. Создайте собственный преобразователь для предварительной обработки данных

'''
Custom Transformer class for tranformation implementation .

@param - 
       Transformer - Transformer class refrence 
       df - dataframe in which operation need to be carried ( passed through tranform function)
       A - A class for variable sharing.

@return -
       df - a dataframe which contains prediction value as well with featured value. 

'''

class preprocess_transform(Transformer):
  
    def _transform(self,df):
      print("********************************  in Transform method ...************************************")
      
      
      """
      Generate feature column in dataframe based on specific logic

      @param - 
               df - dataframe for operation.

      @return - 
               df - dataframe with generated feature.
      """
      
      
      def feature_generation(self,df):
        df = df.withColumn("Initial",regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
        df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
                        ['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',  'Other','Other','Mr','Mr','Mr'])
        df = df.withColumn("Family_Size",col('SibSp')+col('Parch'))
        df = df.withColumn('Alone',lit(0))
        df = df.withColumn("Alone",when(df["Family_Size"] ==0, 1).otherwise(df["Alone"]))
        return df


      """
      Impute Age based on Age mean of specific gender. ex for male mean is 46 update all null male row with 46, similarly for others

      @param - 
              df - dataframe for operation

      @return -
             df - with imputed value

      """
  
      def Age_impute(self,df):
        Age_mean = df.groupBy("Initial").avg('Age')
        Age_mean = Age_mean.withColumnRenamed('avg(Age)','mean_age')
        Initials_list = Age_mean.select("Initial").rdd.flatMap(lambda x: x).collect()
        Mean_list = Age_mean.select("mean_age").rdd.flatMap(lambda x: x).collect()
        for i,j in zip(Initials_list,Mean_list):
            df = df.withColumn("Age",when((df["Initial"] == i) & (df["Age"].isNull()), j).otherwise(df["Age"]))

        return df
        
        
      """
      Impute Embark based on mode of embark column
      @param - 
              df - dataframe for operation

      @return -
             df - with imputed value

      """
      def Embark_impute(self,df):
        mode_value = df.groupBy('Embarked').count().sort(col('count').desc()).collect()[0][0]
        df = df.fillna({'Embarked':mode_value})
        return df
      
      
      """
      Impute Fare based on the class which he/she had sat ex: class 3rd has mean fare 9 and null fare belong to 3rd class so fill 9
      @param - 
              df - dataframe for operation

      @return -
             df - with imputed value

      """
      def Fare_impute(self,df):
        Select_pclass = df.filter(col('Fare').isNull()).select('Pclass')
        if Select_pclass.count() > 0:
          Pclass = Select_pclass.rdd.flatMap(lambda x: x).collect()
          for i in Pclass:
            mean_pclass_fare = df.groupBy('Pclass').mean().select('Pclass','avg(Fare)').filter(col('Pclass')== i).collect()[0][1]
            df = df.withColumn("Fare",when((col('Fare').isNull()) & (col('Pclass') == i),mean_pclass_fare).otherwise(col('Fare')))
        return df
      
      
      '''
      combining all column imputation together..

      @param - 
            df - a dataframe for operation.

      @return - 
            df - dataframe with imputed value.

      '''
      def all_impute_together(df):
        df = Age_impute(self,df)
        df = Embark_impute(self,df)
        df = Fare_impute(self,df)
        return df
      
      
      '''
      converting string to numeric values.

      @param - 
               df - dataframe contained all columns.
               col_list - list of column need to be 

      @return - 
              df - transformed dataframe.
      '''
      def stringToNumeric_conv(df,col_list):
        indexer = [StringIndexer(inputCol=column,outputCol=column+"_index").fit(df) for column in col_list]
        string_change_pipeline = Pipeline(stages=indexer)
        df = string_change_pipeline.fit(df).transform(df)
        return df

      
      """
      Drop column from dataframe
      @param -
             df - dataframe 
             col_name - name of column which need to be dropped.
      @return -
             df - a dataframe except dropped column
      """
      def drop_column(df,col_list):
        for i in col_list:
            df = df.drop(col(i))
        return df
      
      
      col_list = ["Sex","Embarked","Initial"]
      dataset = feature_generation(self,df)
      df_impute = all_impute_together(dataset)
      df_numeric = stringToNumeric_conv(df_impute,col_list)
      df_final = drop_column(df_numeric,['Cabin','Name','Ticket','Family_Size','SibSp','Parch','Sex','Embarked','Initial'])
      return df_final

В классе Transformer представлены различные методы, каждый для разных операций.

Feature_generation () - создать заголовок из столбца имени.

Age_impute () - условно вычислить возраст на основе среднего значения возрастной группы.

Embark_impute и Fare_impute - Impute Посадка и тариф

StringToNumeric () - преобразовать тип данных String в числовой

Drop_col - Удалить ненужные столбцы из фрейма данных

Шаг -5 Создание конвейера и извлечение модели

from pyspark.ml.classification import GBTClassifier 
from pyspark.ml.classification import RandomForestClassifier 
from pyspark.ml.evaluation import MulticlassClassificationEvaluator  

# initialization for pipeline setup 
my_model = preprocess_transform() 
df = my_model.transform(df) 
feature = VectorAssembler(inputCols=['Pclass','Age','Fare','Alone','Sex_index','Embarked_index','Initial_index'],outputCol="features")  
 rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10) 
''' pipeline stages initilization , fit and transform. ''' 
pipeline = Pipeline(stages=[feature,rf])  
model = pipeline.fit(df)  
paramGrid = ParamGridBuilder().addGrid(rf.numTrees,[100,300]).build()  
evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")  
crossval = CrossValidator(estimator=pipeline,                           estimatorParamMaps=paramGrid,                        evaluator=evaluator,numFolds=3)  
# use 3+ folds in practice  # Run cross-validation, and choose the best set of parameters. 
cvModel = crossval.fit(df) 
prediction = cvModel.transform(df_test)   
mlflow.spark.log_model(model, "spark-model16")  mlflow.spark.save_model(model, "spark-model_test") 

На этом этапе сначала мы вызываем класс трансформатора и преобразуем наш фрейм данных с помощью созданного выше варианта.

После преобразования вызовите FeatureAssembler, который используется для привязки всех входных функций к одному вектору. Создайте объект конвейера, пройдя два этапа: первый - это ассемблер функций, а второй - оценщик (классификатор или регрессор).

Если вы хотите, вы можете создать перекрестную проверку, используя список гиперпараметров, paramGridBuilder использовать для назначения различных гиперпараметров с использованием списка значений.

Установите Evaluator для измерения таких критериев, как MulticlassClassificationEvaluator

Вызвать кросс-валидатор и передать модель конвейера, сетку параметров, вычислитель и количество сверток.

Как только все это будет сделано, поместите данные в функцию кросс-валидатора, помните, что она содержит конвейер, у которого есть модель. Таким образом, он будет обучать модель с различными комбинациями параметров и, наконец, использовать метод transform для получения прогнозируемых данных.

MLFlow - это платформа, которая управляет циклом машинного обучения. После предсказания мы можем использовать две функции потока мл: log и save. Функция журнала будет регистрировать метрики обработки на портале ML FLow, а функция Сохранить сохранит лучшую модель ml. ML-поток имеет много других полезных функций, поэтому просто ознакомьтесь с их официальной документацией.

Шаг 6: развертывание в Azure с использованием Azure ML

import mlflow.azureml
from azureml.core import Workspace
from azureml.core.webservice import AciWebservice, Webservice

Создайте рабочую область в Azure ML.

workspace_name = "MLServiceDMWS11"
subscription_id = "xxxxxxxx-23ad-4272-xxxx-0d504b07d497"
resource_group = "mlservice_ws"
location = "xxx"
azure_workspace = Workspace.create(name=workspace_name,
                                   subscription_id=subscription_id,
                                   resource_group=resource_group,
                                   location=location,
                                   create_resource_group=False,
                                  )

Создание образа модели в рабочей области, по сути, означает, что мы просто сохраняем объект модели в рабочей области.

azure_image, azure_model = mlflow.azureml.build_image(model_uri="/dbfs/databricks/mlflow/my_test_ml_flow",
                                                      workspace=azure_workspace,
                                                      description="model_description",
                                                      synchronous=True)

При использовании API веб-службы Azure модель будет отображаться как конечная точка отдыха. Передача изображения модели, рабочего пространства и настройки конфигурации.

webservice_deployment_config = AciWebservice.deploy_configuration()
webservice = Webservice.deploy_from_image(deployment_config=webservice_deployment_config,
                                          image=azure_image, 
                                          workspace=azure_workspace, 
                                          name='mysvc')
webservice.wait_for_deployment()
print("Scoring URI is: %s", webservice.scoring_uri)

После развертывания модели давайте просто проверим, работает ли созданный нами API. Передайте параметр и соответствующее значение в списке и отправьте почтовый запрос. Как только запрос будет успешным, он подтвердит ответ. Стандартный способ приема i / p и o / p - в формате json.

import requests
import json

sample_input = {
    "columns": [
        "col1",
        "col2",
        "col3",
       "coln "
    ],
    "data": [
        [val1, val2, val3,...... valn]
    ]
}
response = requests.post(
              url=webservice.scoring_uri, data=json.dumps(sample_input),
              headers={"Content-type": "application/json"})
response_json = json.loads(response.text)
print(response_json)

Это только начальный набросок этой статьи, я обновлю его в ближайшем будущем. Вы можете проверить весь код по моей ссылке git ниже. Всегда признателен, если у вас есть отзывы или предложения.



Спасибо за поддержку! Скоро увидимся в следующий раз :)