Для задачи двоичной классификации с несбалансированными классами
Вступление
Вычисления в памяти и параллельная обработка являются одними из основных причин того, что Apache Spark стал очень популярным в индустрии больших данных для работы с крупномасштабными продуктами данных и более быстрого анализа. MLlib, построенный на основе Spark, представляет собой масштабируемую библиотеку машинного обучения, которая обеспечивает как высококачественные алгоритмы, так и молниеносную скорость. Обладая отличными API-интерфейсами для Java, Python и Scala, он является лучшим выбором для аналитиков данных, инженеров данных и специалистов по данным. MLlib состоит из общих алгоритмов обучения и утилит, включая классификацию, регрессию, кластеризацию, совместную фильтрацию (матричную факторизацию), уменьшение размерности и т. Д.
Реализация
В этой статье мы собираемся построить сквозную модель машинного обучения с использованием MLlib в pySpark. мы собираемся использовать набор реальных данных из конкурса Риск дефолта Home Credit на kaggle. Цель этого конкурса состояла в том, чтобы определить, способны ли соискатели ссуды выплатить свои ссуды на основе данных, которые были собраны от каждого соискателя. целевая переменная - либо 0 (кандидаты, которые смогли выплатить свои ссуды), либо 1 (кандидаты, которые НЕ смогли выплатить свои ссуды). это проблема двоичной классификации с сильно несбалансированной целевой меткой. коэффициент распределения близок к 0,91–0,09, при этом 0,91 - это соотношение заявителей, которые смогли выплатить свои займы, а 0,09 - это соотношение заявителей, которые не смогли выплатить свои займы.
Давайте начнем с рассмотрения структуры нашего набора данных:
#we use the findspark library to locate spark on our local machine import findspark findspark.init('home Diredtory of Spark') from pyspark.sql import SparkSession # initiate our session and read the main CSV file, then we print the #dataframe schema spark = SparkSession.builder.appName('imbalanced_binary_classification').getOrCreate() new_df = spark.read.csv('application_train.csv', header=True, inferSchema=True) new_df.printSchema() root |-- SK_ID_CURR: integer (nullable = true) |-- TARGET: integer (nullable = true) |-- NAME_CONTRACT_TYPE: string (nullable = true) |-- CODE_GENDER: string (nullable = true) |-- FLAG_OWN_CAR: string (nullable = true) |-- FLAG_OWN_REALTY: string (nullable = true) |-- CNT_CHILDREN: integer (nullable = true) |-- AMT_INCOME_TOTAL: double (nullable = true) |-- AMT_CREDIT: double (nullable = true) |-- AMT_ANNUITY: double (nullable = true) |-- AMT_GOODS_PRICE: double (nullable = true) |-- NAME_TYPE_SUITE: string (nullable = true) |-- NAME_INCOME_TYPE: string (nullable = true) |-- NAME_EDUCATION_TYPE: string (nullable = true) |-- NAME_FAMILY_STATUS: string (nullable = true) |-- NAME_HOUSING_TYPE: string (nullable = true) |-- REGION_POPULATION_RELATIVE: double (nullable = true) ...
printSchema () показывает нам только имена столбцов и их тип данных. мы собираемся отбросить столбец SK_ID_CURR, переименовать столбец «TARGET» в «label» и посмотреть распределение нашей целевой переменной:
# Sk_ID_Curr is the id column which we dont need it in the process #so we get rid of it. and we rename the name of our # target variable to "label" drop_col = ['SK_ID_CURR'] new_df = new_df.select([column for column in new_df.columns if column not in drop_col]) new_df = new_df.withColumnRenamed('TARGET', 'label') new_df.groupby('label').count().toPandas()
мы можем визуализировать распределение меток с помощью matplotlib:
# let's have a look at the distribution of our target variable: # to make it look better, we first convert our spark df to a Pandas import matplotlib.pyplot as plt import seaborn as sns %matplotlib inline df_pd = new_df.toPandas() print(len(df_pd)) plt.figure(figsize=(12,10)) sns.countplot(x='label', data=df_pd, order=df_pd['label'].value_counts().index)
и вот как данные выглядят в формате фрейма данных Pandas:
# let's see how everything look in Pandas import pandas as pd pd.DataFrame(new_df.take(10), columns= new_df.columns)
Преодоление данных
Теперь, когда у нас есть некоторые идеи об общей структуре нашего набора данных, давайте продолжим анализ данных. Сначала мы проверяем, сколько у нас есть категориальных и числовых функций. Затем мы создаем функцию, которая выводит важную информацию об отсутствующих значениях в нашем наборе данных:
# now let's see how many categorical and numerical features we have: cat_cols = [item[0] for item in new_df.dtypes if item[1].startswith('string')] print(str(len(cat_cols)) + ' categorical features') num_cols = [item[0] for item in new_df.dtypes if item[1].startswith('int') | item[1].startswith('double')][1:] print(str(len(num_cols)) + ' numerical features') 16 categorical features 104 numerical features
вот как мы получаем таблицу для отсутствующей информации:
# we use the below function to find more information about the #missing values def info_missing_table(df_pd): """Input pandas dataframe and Return columns with missing value and percentage""" mis_val = df_pd.isnull().sum() #count total of null in each columns in dataframe #count percentage of null in each columns mis_val_percent = 100 * df_pd.isnull().sum() / len(df_pd) mis_val_table = pd.concat([mis_val, mis_val_percent], axis=1) #join to left (as column) between mis_val and mis_val_percent mis_val_table_ren_columns = mis_val_table.rename( columns = {0 : 'Missing Values', 1 : '% of Total Values'}) #rename columns in table mis_val_table_ren_columns = mis_val_table_ren_columns[ mis_val_table_ren_columns.iloc[:,1] != 0].sort_values('% of Total Values', ascending=False).round(1) print ("Your selected dataframe has " + str(df_pd.shape[1]) + " columns.\n" #.shape[1] : just view total columns in dataframe "There are " + str(mis_val_table_ren_columns.shape[0]) + " columns that have missing values.") #.shape[0] : just view total rows in dataframe return mis_val_table_ren_columns missings = info_missing_table(df_pd) missings
в 67 столбцах из 121 отсутствуют значения. и он не показывает их все на изображении, но в целом большинство из этих 67 столбцов имеют более 50% пропущенных значений. поэтому мы имеем дело с множеством пропущенных значений. мы собираемся заполнить числовые пропущенные значения средним значением каждого столбца, а категориальные пропущенные значения - наиболее частой категорией каждого столбца. но сначала давайте посчитаем недостающие значения в каждом столбце:
miss_counts = count_missings(new_df) miss_counts [('AMT_ANNUITY', 12), ('AMT_GOODS_PRICE', 278), ('NAME_TYPE_SUITE', 1292), ('OWN_CAR_AGE', 202929), ('OCCUPATION_TYPE', 96391), ('CNT_FAM_MEMBERS', 2), ('EXT_SOURCE_1', 173378), ('EXT_SOURCE_2', 660), ('EXT_SOURCE_3', 60965), ('APARTMENTS_AVG', 156061), ('BASEMENTAREA_AVG', 179943), ('YEARS_BEGINEXPLUATATION_AVG', 150007), ('YEARS_BUILD_AVG', 204488), ...
разделяем категориальные и числовые столбцы с пропущенными значениями:
# here we seperate missing columns in our new_df based on #categorical and numerical types list_cols_miss=[x[0] for x in miss_counts] df_miss= new_df.select(*list_cols_miss) #categorical columns catcolums_miss=[item[0] for item in df_miss.dtypes if item[1].startswith('string')] #will select name of column with string data type print("cateogrical columns_miss:", catcolums_miss) ### numerical columns numcolumns_miss = [item[0] for item in df_miss.dtypes if item[1].startswith('int') | item[1].startswith('double')] #will select name of column with integer or double data type print("numerical columns_miss:", numcolumns_miss)
далее заполняем недостающие значения:
# now that we have seperated the columns based on categorical and #numerical types, we will fill the missing categiracl # values with the most frequent category from pyspark.sql.functions import rank,sum,col df_Nomiss=new_df.na.drop() for x in catcolums_miss: mode=df_Nomiss.groupBy(x).count().sort(col("count").desc()).collect()[0][0] print(x, mode) #print name of columns and it's most categories new_df = new_df.na.fill({x:mode}) # and we fill the missing numerical values with the average of each #column from pyspark.sql.functions import mean, round for i in numcolumns_miss: meanvalue = new_df.select(round(mean(i))).collect()[0][0] print(i, meanvalue) new_df=new_df.na.fill({i:meanvalue})
Теперь, когда в нашем наборе данных больше нет пропущенных значений, давайте поработаем над тем, как бороться с несбалансированными классами. существуют разные методы решения этой проблемы. один из способов - уменьшить выборку класса большинства или превзойти выборку класса меньшинства, чтобы получить более сбалансированные результаты. другой способ - присвоить веса каждому классу, чтобы наказать класс большинства, присвоив меньший вес, и повысить класс меньшинства, назначив больший вес. мы собираемся создать новый столбец в наборе данных с именем «веса» и назначить обратное соотношение для каждого класса в качестве весов. вот как это делается:
# adding the new column weights and fill it with ratios from pyspark.sql.functions import when ratio = 0.91 def weight_balance(labels): return when(labels == 1, ratio).otherwise(1*(1-ratio)) new_df = new_df.withColumn('weights', weight_balance(col('label')))
и вот как это выглядит после добавления столбца веса:
Функциональная инженерия
следующий шаг - Разработка функций. pySpark сделал это настолько простым, что нам не нужно много делать для извлечения функций. вот шаги:
- мы применяем StringIndexer () для присвоения индексов каждой категории в наших категориальных столбцах.
- мы применяем OneHotEncoderEstimator () для преобразования категориальных столбцов в векторы с однократным кодированием.
- и мы применяем VectorAssembler () для создания вектора признаков из всех категориальных и числовых признаков, и мы называем конечный вектор «признаками».
# we use the OneHotEncoderEstimator from MLlib in spark to convert #aech v=categorical feature into one-hot vectors # next, we use VectorAssembler to combine the resulted one-hot ector #and the rest of numerical features into a # single vector column. we append every step of the process in a #stages array from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler stages = [] for categoricalCol in cat_cols: stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index') encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"]) stages += [stringIndexer, encoder] assemblerInputs = [c + "classVec" for c in cat_cols] + num_cols assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") stages += [assembler]
А теперь давайте поместим все в конвейер. здесь мы выполняем последовательность преобразований, поэтому мы делаем их все сразу, используя конвейер:
# we use a pipeline to apply all the stages of transformation from pyspark.ml import Pipeline cols = new_df.columns pipeline = Pipeline(stages = stages) pipelineModel = pipeline.fit(new_df) new_df = pipelineModel.transform(new_df) selectedCols = ['features']+cols new_df = new_df.select(selectedCols) pd.DataFrame(new_df.take(5), columns=new_df.columns)
вот как наш новый набор данных выглядит после разработки функций:
Обучение и настройка гиперпараметров
для обучения мы сначала разбиваем набор данных на обучающий и тестовый наборы. затем мы начинаем обучение с использованием логистической регрессии, потому что она хорошо справляется с задачами двоичной классификации.
# split the data into trainign and testin sets train, test = new_df.randomSplit([0.80, 0.20], seed = 42) print(train.count()) print(test.count()) # first we check how LogisticRegression perform from pyspark.ml.classification import LogisticRegression LR = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=15) LR_model = LR.fit(train)
мы собираемся построить кривую ROC для данных обучения, чтобы увидеть, как выполняется логистическая регрессия, а затем мы будем использовать кривую Area Under ROC, которая является стандартной метрикой для оценки двоичной классификации, в качестве метрики для оценки моделей:
#plotting the ROC Curve trainingSummary = LR_model.summary roc = trainingSummary.roc.toPandas() plt.plot(roc['FPR'],roc['TPR']) plt.ylabel('False Positive Rate') plt.xlabel('True Positive Rate') plt.title('ROC Curve') plt.show() print('Training set ROC: ' + str(trainingSummary.areaUnderROC))
проверка работоспособности модели на тестовом наборе:
from pyspark.ml.evaluation import BinaryClassificationEvaluator predictions_LR = LR_model.transform(test) evaluator = BinaryClassificationEvaluator() print("Test_SET (Area Under ROC): " + str(evaluator.evaluate(predictions_LR, {evaluator.metricName: "areaUnderROC"}))) Test_SET (Area Under ROC): 0.7111434396856681
0,711 - неплохой результат для логистической регрессии. Затем мы пробуем другую модель, Деревья повышения градиента (GBT). это очень популярный метод классификации и регрессии, в котором используются ансамбли деревьев решений.
# next we checkout gradient boosting trees from pyspark.ml.classification import GBTClassifier gbt = GBTClassifier(maxIter=15) GBT_Model = gbt.fit(train) gbt_predictions = GBT_Model.transform(test) evaluator = BinaryClassificationEvaluator() print("Test_SET (Area Under ROC): " + str(evaluator.evaluate(gbt_predictions, {evaluator.metricName: "areaUnderROC"}))) Test_SET (Area Under ROC): 0.7322019340889893
мы смогли достичь гораздо лучшего результата, 0,732, используя GBT. В качестве последней стратегии мы реализуем настройку гиперпараметров с помощью поиска по сетке, а после этого мы запустим перекрестную проверку, чтобы улучшить производительность GBT.
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator paramGrid = (ParamGridBuilder() .addGrid(gbt.maxDepth, [2, 4, 6]) .addGrid(gbt.maxBins, [20, 30]) .addGrid(gbt.maxIter, [10, 15]) .build()) cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5) # Run cross validations. cvModel = cv.fit(train) gbt_cv_predictions = cvModel.transform(test) evaluator.evaluate(gbt_cv_predictions) CV_GBT (Area Under ROC) = 0.7368288195372332
результат был немного улучшен, а это означает, что мы все еще можем поиграть с настройкой гиперпараметров, чтобы увидеть, сможем ли мы еще больше улучшить результат.
В этом проекте мы построили модель сквозного машинного обучения (бинарная классификация с несбалансированными классами). и продемонстрировали мощь MLlib Apache Spark и то, как ее можно применить для сквозных проектов машинного обучения.
как всегда, код и блокнот jupyter доступны на моем Github.
Вопросы и комментарии приветствуются.
Использованная литература: