Для задачи двоичной классификации с несбалансированными классами

Вступление

Вычисления в памяти и параллельная обработка являются одними из основных причин того, что Apache Spark стал очень популярным в индустрии больших данных для работы с крупномасштабными продуктами данных и более быстрого анализа. MLlib, построенный на основе Spark, представляет собой масштабируемую библиотеку машинного обучения, которая обеспечивает как высококачественные алгоритмы, так и молниеносную скорость. Обладая отличными API-интерфейсами для Java, Python и Scala, он является лучшим выбором для аналитиков данных, инженеров данных и специалистов по данным. MLlib состоит из общих алгоритмов обучения и утилит, включая классификацию, регрессию, кластеризацию, совместную фильтрацию (матричную факторизацию), уменьшение размерности и т. Д.

Реализация

В этой статье мы собираемся построить сквозную модель машинного обучения с использованием MLlib в pySpark. мы собираемся использовать набор реальных данных из конкурса Риск дефолта Home Credit на kaggle. Цель этого конкурса состояла в том, чтобы определить, способны ли соискатели ссуды выплатить свои ссуды на основе данных, которые были собраны от каждого соискателя. целевая переменная - либо 0 (кандидаты, которые смогли выплатить свои ссуды), либо 1 (кандидаты, которые НЕ смогли выплатить свои ссуды). это проблема двоичной классификации с сильно несбалансированной целевой меткой. коэффициент распределения близок к 0,91–0,09, при этом 0,91 - это соотношение заявителей, которые смогли выплатить свои займы, а 0,09 - это соотношение заявителей, которые не смогли выплатить свои займы.

Давайте начнем с рассмотрения структуры нашего набора данных:

#we use the findspark library to locate spark on our local machine
import findspark
findspark.init('home Diredtory of Spark')
from pyspark.sql import SparkSession
# initiate our session and read the main CSV file, then we print the #dataframe schema
spark = SparkSession.builder.appName('imbalanced_binary_classification').getOrCreate()
new_df = spark.read.csv('application_train.csv', header=True, inferSchema=True)
new_df.printSchema()
root
 |-- SK_ID_CURR: integer (nullable = true)
 |-- TARGET: integer (nullable = true)
 |-- NAME_CONTRACT_TYPE: string (nullable = true)
 |-- CODE_GENDER: string (nullable = true)
 |-- FLAG_OWN_CAR: string (nullable = true)
 |-- FLAG_OWN_REALTY: string (nullable = true)
 |-- CNT_CHILDREN: integer (nullable = true)
 |-- AMT_INCOME_TOTAL: double (nullable = true)
 |-- AMT_CREDIT: double (nullable = true)
 |-- AMT_ANNUITY: double (nullable = true)
 |-- AMT_GOODS_PRICE: double (nullable = true)
 |-- NAME_TYPE_SUITE: string (nullable = true)
 |-- NAME_INCOME_TYPE: string (nullable = true)
 |-- NAME_EDUCATION_TYPE: string (nullable = true)
 |-- NAME_FAMILY_STATUS: string (nullable = true)
 |-- NAME_HOUSING_TYPE: string (nullable = true)
 |-- REGION_POPULATION_RELATIVE: double (nullable = true)
...

printSchema () показывает нам только имена столбцов и их тип данных. мы собираемся отбросить столбец SK_ID_CURR, переименовать столбец «TARGET» в «label» и посмотреть распределение нашей целевой переменной:

# Sk_ID_Curr is the id column which we dont need it in the process #so we get rid of it. and we rename the name of our 
# target variable to "label"
drop_col = ['SK_ID_CURR']
new_df = new_df.select([column for column in new_df.columns if column not in drop_col])
new_df = new_df.withColumnRenamed('TARGET', 'label')
new_df.groupby('label').count().toPandas()

мы можем визуализировать распределение меток с помощью matplotlib:

# let's have a look at the distribution of our target variable:
# to make it look better, we first convert our spark df to a Pandas
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline
df_pd = new_df.toPandas()
print(len(df_pd))
plt.figure(figsize=(12,10))
sns.countplot(x='label', data=df_pd, order=df_pd['label'].value_counts().index)

и вот как данные выглядят в формате фрейма данных Pandas:

# let's see how everything look in Pandas
import pandas as pd
pd.DataFrame(new_df.take(10), columns= new_df.columns)

Преодоление данных

Теперь, когда у нас есть некоторые идеи об общей структуре нашего набора данных, давайте продолжим анализ данных. Сначала мы проверяем, сколько у нас есть категориальных и числовых функций. Затем мы создаем функцию, которая выводит важную информацию об отсутствующих значениях в нашем наборе данных:

# now let's see how many categorical and numerical features we have:
cat_cols = [item[0] for item in new_df.dtypes if item[1].startswith('string')] 
print(str(len(cat_cols)) + '  categorical features')
num_cols = [item[0] for item in new_df.dtypes if item[1].startswith('int') | item[1].startswith('double')][1:]
print(str(len(num_cols)) + '  numerical features')
16  categorical features
104  numerical features

вот как мы получаем таблицу для отсутствующей информации:

# we use the below function to find more information about the #missing values
def info_missing_table(df_pd):
    """Input pandas dataframe and Return columns with missing value and percentage"""
    mis_val = df_pd.isnull().sum() #count total of null in each columns in dataframe
#count percentage of null in each columns
    mis_val_percent = 100 * df_pd.isnull().sum() / len(df_pd) 
    mis_val_table = pd.concat([mis_val, mis_val_percent], axis=1) 
 #join to left (as column) between mis_val and mis_val_percent
    mis_val_table_ren_columns = mis_val_table.rename(
    columns = {0 : 'Missing Values', 1 : '% of Total Values'}) 
#rename columns in table
    mis_val_table_ren_columns = mis_val_table_ren_columns[
    mis_val_table_ren_columns.iloc[:,1] != 0].sort_values('% of Total Values', ascending=False).round(1) 
        
    print ("Your selected dataframe has " + str(df_pd.shape[1]) + " columns.\n"    #.shape[1] : just view total columns in dataframe  
    "There are " + str(mis_val_table_ren_columns.shape[0]) +              
    " columns that have missing values.") #.shape[0] : just view total rows in dataframe
    return mis_val_table_ren_columns

missings = info_missing_table(df_pd)
missings

в 67 столбцах из 121 отсутствуют значения. и он не показывает их все на изображении, но в целом большинство из этих 67 столбцов имеют более 50% пропущенных значений. поэтому мы имеем дело с множеством пропущенных значений. мы собираемся заполнить числовые пропущенные значения средним значением каждого столбца, а категориальные пропущенные значения - наиболее частой категорией каждого столбца. но сначала давайте посчитаем недостающие значения в каждом столбце:

miss_counts = count_missings(new_df)
miss_counts
[('AMT_ANNUITY', 12),
 ('AMT_GOODS_PRICE', 278),
 ('NAME_TYPE_SUITE', 1292),
 ('OWN_CAR_AGE', 202929),
 ('OCCUPATION_TYPE', 96391),
 ('CNT_FAM_MEMBERS', 2),
 ('EXT_SOURCE_1', 173378),
 ('EXT_SOURCE_2', 660),
 ('EXT_SOURCE_3', 60965),
 ('APARTMENTS_AVG', 156061),
 ('BASEMENTAREA_AVG', 179943),
 ('YEARS_BEGINEXPLUATATION_AVG', 150007),
 ('YEARS_BUILD_AVG', 204488),
...

разделяем категориальные и числовые столбцы с пропущенными значениями:

# here we seperate missing columns in our new_df based on #categorical and numerical types
list_cols_miss=[x[0] for x in miss_counts]
df_miss= new_df.select(*list_cols_miss)
#categorical columns
catcolums_miss=[item[0] for item in df_miss.dtypes if item[1].startswith('string')]  #will select name of column with string data type
print("cateogrical columns_miss:", catcolums_miss)
### numerical columns
numcolumns_miss = [item[0] for item in df_miss.dtypes if item[1].startswith('int') | item[1].startswith('double')] #will select name of column with integer or double data type
print("numerical columns_miss:", numcolumns_miss)

далее заполняем недостающие значения:

# now that we have seperated the columns based on categorical and #numerical types, we will fill the missing categiracl 
# values with the most frequent category
from pyspark.sql.functions import rank,sum,col
df_Nomiss=new_df.na.drop()
for x in catcolums_miss:                  mode=df_Nomiss.groupBy(x).count().sort(col("count").desc()).collect()[0][0] 
    print(x, mode) #print name of columns and it's most categories 
    new_df = new_df.na.fill({x:mode})
# and we fill the missing numerical values with the average of each #column
from pyspark.sql.functions import mean, round
for i in numcolumns_miss:
    meanvalue = new_df.select(round(mean(i))).collect()[0][0] 
    print(i, meanvalue) 
    new_df=new_df.na.fill({i:meanvalue})

Теперь, когда в нашем наборе данных больше нет пропущенных значений, давайте поработаем над тем, как бороться с несбалансированными классами. существуют разные методы решения этой проблемы. один из способов - уменьшить выборку класса большинства или превзойти выборку класса меньшинства, чтобы получить более сбалансированные результаты. другой способ - присвоить веса каждому классу, чтобы наказать класс большинства, присвоив меньший вес, и повысить класс меньшинства, назначив больший вес. мы собираемся создать новый столбец в наборе данных с именем «веса» и назначить обратное соотношение для каждого класса в качестве весов. вот как это делается:

# adding the new column weights and fill it with ratios
from pyspark.sql.functions import when
ratio = 0.91
def weight_balance(labels):
    return when(labels == 1, ratio).otherwise(1*(1-ratio))
new_df = new_df.withColumn('weights', weight_balance(col('label')))

и вот как это выглядит после добавления столбца веса:

Функциональная инженерия

следующий шаг - Разработка функций. pySpark сделал это настолько простым, что нам не нужно много делать для извлечения функций. вот шаги:

  1. мы применяем StringIndexer () для присвоения индексов каждой категории в наших категориальных столбцах.
  2. мы применяем OneHotEncoderEstimator () для преобразования категориальных столбцов в векторы с однократным кодированием.
  3. и мы применяем VectorAssembler () для создания вектора признаков из всех категориальных и числовых признаков, и мы называем конечный вектор «признаками».
# we use the OneHotEncoderEstimator from MLlib in spark to convert #aech v=categorical feature into one-hot vectors
# next, we use VectorAssembler to combine the resulted one-hot ector #and the rest of numerical features into a 
# single vector column. we append every step of the process in a #stages array
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer, VectorAssembler
stages = []
for categoricalCol in cat_cols:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
assemblerInputs = [c + "classVec" for c in cat_cols] + num_cols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

А теперь давайте поместим все в конвейер. здесь мы выполняем последовательность преобразований, поэтому мы делаем их все сразу, используя конвейер:

# we use a pipeline to apply all the stages of transformation
from pyspark.ml import Pipeline
cols = new_df.columns
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(new_df)
new_df = pipelineModel.transform(new_df)
selectedCols = ['features']+cols
new_df = new_df.select(selectedCols)
pd.DataFrame(new_df.take(5), columns=new_df.columns)

вот как наш новый набор данных выглядит после разработки функций:

Обучение и настройка гиперпараметров

для обучения мы сначала разбиваем набор данных на обучающий и тестовый наборы. затем мы начинаем обучение с использованием логистической регрессии, потому что она хорошо справляется с задачами двоичной классификации.

# split the data into trainign and testin sets
train, test = new_df.randomSplit([0.80, 0.20], seed = 42)
print(train.count())
print(test.count())
# first we check how LogisticRegression perform 
from pyspark.ml.classification import LogisticRegression
LR = LogisticRegression(featuresCol = 'features', labelCol = 'label', maxIter=15)
LR_model = LR.fit(train)

мы собираемся построить кривую ROC для данных обучения, чтобы увидеть, как выполняется логистическая регрессия, а затем мы будем использовать кривую Area Under ROC, которая является стандартной метрикой для оценки двоичной классификации, в качестве метрики для оценки моделей:

#plotting the ROC Curve
trainingSummary = LR_model.summary
roc = trainingSummary.roc.toPandas()
plt.plot(roc['FPR'],roc['TPR'])
plt.ylabel('False Positive Rate')
plt.xlabel('True Positive Rate')
plt.title('ROC Curve')
plt.show()
print('Training set ROC: ' + str(trainingSummary.areaUnderROC))

проверка работоспособности модели на тестовом наборе:

from pyspark.ml.evaluation import BinaryClassificationEvaluator
predictions_LR = LR_model.transform(test)
evaluator = BinaryClassificationEvaluator()
print("Test_SET (Area Under ROC): " + str(evaluator.evaluate(predictions_LR, {evaluator.metricName: "areaUnderROC"})))
Test_SET (Area Under ROC): 0.7111434396856681

0,711 - неплохой результат для логистической регрессии. Затем мы пробуем другую модель, Деревья повышения градиента (GBT). это очень популярный метод классификации и регрессии, в котором используются ансамбли деревьев решений.

# next we checkout gradient boosting trees
from pyspark.ml.classification import GBTClassifier
gbt = GBTClassifier(maxIter=15)
GBT_Model = gbt.fit(train)
gbt_predictions = GBT_Model.transform(test)
evaluator = BinaryClassificationEvaluator()
print("Test_SET (Area Under ROC): " + str(evaluator.evaluate(gbt_predictions, {evaluator.metricName: "areaUnderROC"})))
Test_SET (Area Under ROC): 0.7322019340889893

мы смогли достичь гораздо лучшего результата, 0,732, используя GBT. В качестве последней стратегии мы реализуем настройку гиперпараметров с помощью поиска по сетке, а после этого мы запустим перекрестную проверку, чтобы улучшить производительность GBT.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = (ParamGridBuilder()
             .addGrid(gbt.maxDepth, [2, 4, 6])
             .addGrid(gbt.maxBins, [20, 30])
             .addGrid(gbt.maxIter, [10, 15])
             .build())
cv = CrossValidator(estimator=gbt, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations.
cvModel = cv.fit(train)
gbt_cv_predictions = cvModel.transform(test)
evaluator.evaluate(gbt_cv_predictions)
CV_GBT (Area Under ROC) = 0.7368288195372332

результат был немного улучшен, а это означает, что мы все еще можем поиграть с настройкой гиперпараметров, чтобы увидеть, сможем ли мы еще больше улучшить результат.

В этом проекте мы построили модель сквозного машинного обучения (бинарная классификация с несбалансированными классами). и продемонстрировали мощь MLlib Apache Spark и то, как ее можно применить для сквозных проектов машинного обучения.

как всегда, код и блокнот jupyter доступны на моем Github.

Вопросы и комментарии приветствуются.

Использованная литература:

  1. Https://github.com/elsyifa/Classification-Pyspark
  2. Https://spark.apache.org/docs/2.3.0/ml-classification-regression.html