Введение

Идея этой статьи возникла из моего опыта разработки решения для одного из наших корпоративных розничных клиентов. Они столкнулись с проблемой расширения клиентской базы, а их текущие модели машинного обучения для индивидуальных рекомендаций клиентов не могли поспевать за этим ростом.

Как правило, библиотеки машинного обучения, изначально не предназначенные для использования преимуществ распределенных систем, не будут эффективно масштабироваться, даже если они работают в кластере из 100 исполнителей. В таких случаях все данные собираются на узле драйвера кластера, и в зависимости от объема данных это может привести к снижению производительности обучения модели с течением времени или даже к сбою обучения.

В этой статье мы обсудим следующие темы:
1. Важность группового машинного обучения.
2. Распараллеливание группового машинного обучения для обучения.
3. Распараллеливание группового машинного обучения. Специальное машинное обучение для логического вывода.

Прочитав эту статью, вы лучше поймете, как масштабировать обучение модели машинного обучения для конкретной группы с одним узлом с помощью API функций Spark и Pandas в кластере Databricks. Кроме того, вы можете найти все примеры кода в моем репозитории GitHub.

Необходимость группового машинного обучения

Допустим, вы работаете в сфере розничной торговли/потребителей потребительских товаров и хотите разработать для своих клиентов рекомендацию или модель пожизненной ценности клиента. Кроме того, вы можете создать модель для каждого из ваших продуктов или магазинов. Этот сценарий является ярким примером использования группового машинного обучения.

Точно так же в финансовой отрасли, если вы хотите разработать модели, специфичные для каждого символа акции, это тоже пример группового машинного обучения. Эти типы вариантов использования существуют почти в каждой отрасли. Основная цель — обучить машину для каждой группы в нашем наборе данных. Группы определяются как отдельные значения для данного столбца в наших данных.

Однако внедрение этих моделей представляет собой серьезную проблему. Вместо обучения одной модели вам может понадобиться построить сотни или даже тысячи моделей, которые должны быть построены последовательно. Это создает существенные проблемы с масштабированием.

Если бы каждая модель строилась последовательно, потребовалось бы значительное количество времени для перебора каждой группы и обучения модели по одной за раз.

Давайте рассмотрим пример того, как вы будете распараллеливать модели машинного обучения для конкретных групп для обучения.

Распараллеливание группового машинного обучения для обучения

Чтобы распараллелить обучение моделей машинного обучения для конкретных групп, мы будем использовать кластер Spark, управляемый Databricks. Если данные для каждой группы достаточно малы, чтобы поместиться в исполнителе, мы можем построить каждую модель одновременно.

Используя этот подход, мы можем значительно ускорить процесс обучения машинному обучению. Самое приятное то, что он поддерживается Pandas Functions API, который похож на Pandas UDF. Ранее я написал статью о Pandas UDF, к которой вы можете обратиться для дальнейшего понимания.

По сути, Pandas Function API позволяет пользователям применять собственные функции Python к Spark DataFrames. Это достигается за счет использования Apache Arrow для эффективной передачи данных между Pandas DataFrames и Spark DataFrames.

Несмотря на то, что между Pandas UDF и функциями есть некоторые критические различия, о которых я расскажу в следующем сообщении в блоге, многие пользователи могут счесть Pandas Function API более простым в использовании. Одним из существенных преимуществ является то, что функции Pandas не требуют подсказок типа.

Кроме того, Pandas Function API включает в себя множество знакомых и часто используемых функций, таких как рабочий процесс «разделить-применить-объединить».

Этот конкретный рабочий процесс называется шаблоном Групповая карта, и именно о нем мы поговорим в этом блоге.

Мы берем весь набор данных, разбиваем его по групповому значению и обучаем на них отдельные модели машинного обучения. После обучения моделей мы объединяем их в итоговую таблицу, содержащую запись для каждой группы и путь к обученной модели.

Давайте рассмотрим пример кода, который использует платформу ИИ Databricks для масштабирования нашего кода машинного обучения и MLflow для мониторинга различных обученных моделей.

Если вы не знакомы с MLflow, в будущем я напишу сообщение в блоге, чтобы объяснить его более подробно. На высоком уровне MLflow — это проект с открытым исходным кодом, который помогает управлять сквозным машинным обучением и жизненным циклом разработки искусственного интеллекта. Одной из его функций является отслеживание моделей, которое позволяет нам отслеживать каждую модель, которую мы обучаем, включая связанные комбинации гиперпараметров и сгенерированные метрики.

В нашем примере мы будем работать с фиктивными данными, имитируя записи о продажах с веб-сайта электронной коммерции.

- `record_id`: 100k unique records. Each record will have a unique numeric id.
- `product_id`: 10 different products
- `numeric_feature_1`: a numeric feature 
- `numeric_feature_2`: another numeric feature
- `label`: numeric column we are trying to predict. In practical world this can be total sales, conversion etc

Чтобы использовать API функций Pandas для обучения модели, нам нужно определить схему DataFrame, которая будет возвращена функцией после обучения модели для каждого продукта.

Мы будем использовать все доступные данные для данного продукта, идентифицированного его product_id, для обучения модели. Мы запишем размер обучающего набора данных, место сохранения модели и метрику обученной модели для дальнейшего использования.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType

trained_models_info_schema = StructType([
  StructField("product_id", IntegerType()), 
  StructField("training_sample_size", IntegerType()),    
  StructField("model_path", StringType()), 
  StructField("rmse", FloatType())          
])

Прежде чем мы рассмотрим следующий набор кода, нам нужно понять, что такое эксперимент MLflow и его связь с запуском.

В MLflow эксперимент — это контейнер для набора запусков, организованных вокруг общей цели. Эксперимент можно рассматривать как логический контейнер для набора запусков, где каждый запуск представляет собой одно выполнение рабочего процесса машинного обучения.

Запуск в MLflow представляет собой однократное выполнение рабочего процесса машинного обучения, например обучение модели или запуск пакетного задания прогнозирования. Каждый запуск связан с экспериментом и содержит набор метаданных запуска, таких как дата и время запуска, параметры и показатели, которые были зарегистрированы во время запуска, а также исходный код, который использовался для запуска рабочего процесса.

Организуя прогоны в эксперименты, мы можем группировать связанные прогоны вместе и легко сравнивать их результаты.

import mlflow

#lets create a mlflow experiment to track our model training
#by default each python notebook in Databricks 
# has its own experiment however we can define our own

# Define the name of the experiment to create
# this has to be absolute path
experiment_name = "/Shared/product_forecasting_experiment"

# Check if the experiment already exists
existing_experiment = mlflow.get_experiment_by_name(experiment_name)

# If the experiment exists, delete it
if existing_experiment:
    mlflow.delete_experiment(existing_experiment.experiment_id)

# Create a new experiment with the given name
experiment_id = mlflow.create_experiment(experiment_name)

print(experiment_id)

Это создаст пустой эксперимент для отслеживания обучения нашей модели в будущем.

Если щелкнуть значок эксперимента, вы увидите идентификатор эксперимента и место, где будут храниться артефакты для запусков, зарегистрированных в этом эксперименте.

Затем мы определяем функцию Pandas, которая берет все данные для указанного устройства, обучает модель, сохраняет ее как вложенный запуск и возвращает Pandas DataFrame со схемой, которую мы определили ранее.

import mlflow.sklearn
import pandas as pd
import numpy as np
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

def train_model(df_pandas: pd.DataFrame) -> pd.DataFrame:
  """
  Trains GradientBoostingRegressor model on a group of data 
  """
  #collect information about the current DataFrame that is being processed
  #get the product_id for which model is being trained
  product_id = df_pandas["product_id"].iloc[0]
  #get the number of records in the DataFrame
  training_sample_size = df_pandas.shape[0]
  #get the run_id for the current MLflow run for logging later
  run_id = df_pandas["run_id"].iloc[0] 
  #get experiment_id so that we can log all the runs under that experiment
  experiment_id = df_pandas["experiment_id"].iloc[0]
  
  # Create the Gradient Boosting Regression model
  gbr = GradientBoostingRegressor(n_estimators=100, learning_rate=0.1, max_depth=2, random_state=42)
  
  # Define features to train on and the label
  X = df_pandas[["numeric_feature1", "numeric_feature2"]]
  y = df_pandas["label"]
  
  # Split the data into training and test sets
  X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=.2, random_state=42)

  # Train the model on the training data
  gbr.fit(X_train, y_train)

  # Evaluate model
  predictions = gbr.predict(X_test)
  rmse = np.sqrt(mean_squared_error(y_test, predictions)) 

  # we will use the top level run_id so that the current model can be logged as part of it
  with mlflow.start_run(run_id=run_id, experiment_id=experiment_id) as umbrella_run:
    print(f"Current experiment_id = {experiment_id}")
    
    # Create a nested run for the specific product
    with mlflow.start_run(run_name=str(product_id), nested=True, experiment_id=experiment_id) as run:
      mlflow.sklearn.log_model(gbr, str(product_id))
      mlflow.log_metric("rmse", rmse)
      
      artifact_uri = f"runs:/{run.info.run_id}/{product_id}"
      
      return_df = pd.DataFrame([[product_id, training_sample_size, artifact_uri, rmse]], 
        columns=["product_id", "training_sample_size", "model_path", "rmse"])

  return return_df 

Теперь мы применим эту функцию Pandas к нашим сгруппированным данным. В примере, с которым мы работаем, мы будем повторно использовать обучающие данные, которые включают идентификаторы продукта, идентификаторы запуска и идентификаторы эксперимента

from pyspark.sql.functions import lit
#explicitly set the experiment for the moddel trainings we are going to perform
with mlflow.start_run(run_name="Model Training for all the products", experiment_id=experiment_id) as run:
  run_id = run.info.run_id
    
  model_training_info_df = (df
    .withColumn("run_id", lit(run_id)) 
    .withColumn("experiment_id", lit(experiment_id))
    .groupby("product_id")
    .applyInPandas(train_model, schema=trained_models_info_schema)
    .cache()
  )
  
display(model_training_info_df)

После запуска этого блока кода мы видим, что наши различные запуски модели были зарегистрированы в рамках нашего эксперимента.

Если мы посмотрим на результирующий DataFrame, сгенерированный после запуска функции Pandas, мы увидим 10 записей, соответствующих каждому идентификатору продукта и обученной модели.

Теперь давайте посмотрим, как мы можем использовать эти обученные модели для распараллеливания вывода модели.

Распараллеливание группового машинного обучения для логического вывода

Для логического вывода мы соединим нашу model_training_info_df с данными, которые мы хотим использовать для логического вывода. В этом примере мы будем повторно использовать данные, которые мы использовали для обучения.

prediction_schema = StructType([
  StructField("record_id", IntegerType()),
  StructField("prediction", FloatType())
])

combined_df = (df
  .join(model_training_info_df, on="product_id", how="left")
)

display(combined_df)

Далее мы определим функцию Pandas для выполнения логического вывода о нашем Combined_df.

def predict(df_pandas: pd.DataFrame) -> pd.DataFrame:
  """
  Applies model to data for a particular product, represented as a pandas DataFrame
  """
  model_path = df_pandas["model_path"].iloc[0]
  
  X = df_pandas[["numeric_feature1", "numeric_feature2"]]
  
  #loads model only once for all the data for a particular product
  model = mlflow.sklearn.load_model(model_path)
  prediction = model.predict(X)
  
  result_df = pd.DataFrame({
    "record_id": df_pandas["record_id"],
    "prediction": prediction
  })
  return result_df

наконец, мы выполняем масштабируемое прогнозирование для каждого отдельного продукта.

predictions_df = combined_df.groupby("product_id").applyInPandas(predict, schema=prediction_schema)
display(predictions_df)

Краткое содержание

В этом сообщении блога мы обсудили, как вы можете масштабировать свою нераспределенную модель машинного обучения с помощью Databricks для удовлетворения растущих потребностей вашего бизнеса и данных.

Мы также рассмотрели подробный пример шаблона Групповая карта с использованием Pandas Function API.

Доступны и другие шаблоны, а именно Карта и Групповая карта, о которых вы можете прочитать здесь.

Если вы нашли этот пост в блоге полезным, поделитесь своими мыслями и дайте мне знать о любых случаях использования, в которых вы нашли этот шаблон особенно полезным.