Цель этого блога - продемонстрировать двоичную классификацию в pySpark. Различные этапы разработки модели классификации в pySpark следующие:

1) Инициализировать сеанс Spark

2) Загрузите и прочтите набор данных

3) Развитие начального понимания данных

4) Обработка пропущенных значений

5) Масштабирование функций

6) Тренировка тестового сплита

7) Обработка дисбаланса

8) Выбор функции

9) Оценка производительности

Особенности

  1. Обработка дисбаланса с использованием весов классов
  2. Настройка гиперпараметров с помощью ParamGrid
  3. K-кратная перекрестная проверка

Вы можете найти полный файл ipynb здесь

# Initializing a Spark session
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("diabeties").config("spark.some.config.option","some-value").getOrCreate()

Скачайте и прочтите набор данных

Для демонстрации я использую набор данных из базы данных диабета индейцев пима. Набор данных можно легко скачать по этой ссылке https://www.kaggle.com/abhikaggle8/pima-diabetes-classification/data

# Давайте начнем с чтения «diabale.csv» и создадим фрейм данных Spark с именем «raw_data»

raw_data = spark.read.format("csv").option("header","true").option("inferSchema", "true").load(r"diabetes.csv")
raw_data.columns

Это дает нам список столбцов

['Pregnancies', 'Glucose', 'BloodPressure', 'SkinThickness', 'Insulin', 'BMI','DiabetesPedigreeFunction', 'Age', 'Outcome']

Итак, данные содержат 9 столбцов. Каждый из них описывается следующим образом:

Наборы данных состоят из нескольких медицинских предикторов (независимых) и одной целевой (зависимой) переменной, результата. Независимые переменные включают количество беременностей у пациентки, их ИМТ, уровень инсулина, возраст и т. Д.

Столбцы

1) Беременности: количество беременностей (числовое значение).

2) Глюкоза: концентрация глюкозы в плазме через 2 часа в пероральном тесте на толерантность к глюкозе (числовой)

3) Артериальное давление: диастолическое артериальное давление (мм рт. Ст.) (Числовое значение)

4) Толщина кожи: толщина кожной складки трицепса (мм) (числовое значение)

5) Инсулин: 2-часовой сывороточный инсулин (мЕд / мл) (числовой)

6) ИМТ: индекс массы тела (вес в кг / (рост в м) ²) (числовой)

7) Функция родословной диабета: функция родословной диабета (числовая)

8) Возраст: Возраст человека (лет)

9) Результат: переменная класса (0 или 1)

Вопрос в том, чтобы создать модель машинного обучения, чтобы точно предсказать, страдают ли пациенты в наборе данных диабетом?

raw_data.describe().select("Summary","Pregnancies","Glucose","BloodPressure").show()

Это дает нам сводку рассмотренных полей, которая включает количество, среднее, стандартное отклонение, минимальное и максимальное значения, как показано ниже:

+-------+------------------+-----------------+------------------+
|Summary|       Pregnancies|          Glucose|     BloodPressure|
+-------+------------------+-----------------+------------------+
|  count|               768|              768|               768|
|   mean|3.8450520833333335|     120.89453125|       69.10546875|
| stddev|  3.36957806269887|31.97261819513622|19.355807170644777|
|    min|                 0|                0|                 0|
|    max|                17|              199|               122|
+-------+------------------+-----------------+------------------+
raw_data.describe().select("Summary","SkinThickness","Insulin").show()
+-------+------------------+------------------+
|Summary|     SkinThickness|           Insulin|
+-------+------------------+------------------+
|  count|               768|               768|
|   mean|20.536458333333332| 79.79947916666667|
| stddev|15.952217567727642|115.24400235133803|
|    min|                 0|                 0|
|    max|                99|               846|
+-------+------------------+------------------+
raw_data.describe().select("Summary","BMI","DiabetesPedigreeFunction","Age").show()
+-------+------------------+------------------------+---------------|Summary|BMI|DiabetesPedigreeFunction|          Age|
+-------+------------------+------------------------+---------------
|  count| 768   |    768             |          768|
|   mean|31.99  | 0.4718             |      33.240 |
| stddev| 7.8841|0.33132             |      11.7602|
|    min| 0.0   |0.078               |           21|
|    max|   67.1|2.42                |           81|
+-------+------------------+------------------------+---------------

Глядя на приведенные выше таблицы, можно заметить, что минимальное значение для полей, таких как «Беременность», «глюкоза», «артериальное давление», «толщина кожи», «инсулин» и «ИМТ» равны нулю (0), что кажется для меня непрактично (кроме «Беременности»).

Поэтому заменим все нули в вышеперечисленных полях (кроме «Беременности») на NaN.

import numpy as np
from pyspark.sql.functions import when
raw_data=raw_data.withColumn("Glucose",when(raw_data.Glucose==0,np.nan).otherwise(raw_data.Glucose))
raw_data=raw_data.withColumn("BloodPressure",when(raw_data.BloodPressure==0,np.nan).otherwise(raw_data.BloodPressure))
raw_data=raw_data.withColumn("SkinThickness",when(raw_data.SkinThickness==0,np.nan).otherwise(raw_data.SkinThickness))
raw_data=raw_data.withColumn("BMI",when(raw_data.BMI==0,np.nan).otherwise(raw_data.BMI))
raw_data=raw_data.withColumn("Insulin",when(raw_data.Insulin==0,np.nan).otherwise(raw_data.Insulin))
raw_data.select("Insulin","Glucose","BloodPressure","SkinThickness","BMI").show(5)

Итак, можно заметить, что все нули заменяются NaN.

+-------+-------+-------------+-------------+----+
|Insulin|Glucose|BloodPressure|SkinThickness| BMI|
+-------+-------+-------------+-------------+----+
|    NaN|  148.0|         72.0|         35.0|33.6|
|    NaN|   85.0|         66.0|         29.0|26.6|
|    NaN|  183.0|         64.0|          NaN|23.3|
|   94.0|   89.0|         66.0|         23.0|28.1|
|  168.0|  137.0|         40.0|         35.0|43.1|
+-------+-------+-------------+-------------+----+
only showing top 5 rows

Теперь мы можем просто вменить NaN, вызвав импьютер :)

from pyspark.ml.feature import Imputer
imputer=Imputer(inputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"],outputCols=["Glucose","BloodPressure","SkinThickness","BMI","Insulin"])
model=imputer.fit(raw_data)
raw_data=model.transform(raw_data)
raw_data.show(5)

Вдобавок, если мы видим столбец «Беременности» в raw_data, можно увидеть, что максимальное количество достигает 17, что совершенно невероятно. Это может быть выбросом, но мы обсудим обнаружение и удаление выбросов в другой раз.

Теперь давайте объединим все функции в один вектор признаков.

cols=raw_data.columns
cols.remove("Outcome")
# Let us import the vector assembler
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(inputCols=cols,outputCol="features")
# Now let us use the transform method to transform our dataset
raw_data=assembler.transform(raw_data)
raw_data.select("features").show(truncate=False)

Одна строка вектора признаков выглядит следующим образом:

[6.0,148.0,72.0,35.0,155.5482233502538,33.6,0.627,50.0]

Вдобавок, если мы видим столбец «Беременности» в raw_data, можно увидеть, что максимальное количество достигает 17, что совершенно невероятно. Это может быть выбросом, но мы обсудим обнаружение и удаление выбросов в другой раз.

Стандартный скларизатор

Итак, мы создали вектор признаков. Теперь давайте воспользуемся StandardScaler, чтобы масштабировать только что созданный столбец «Feature».

from pyspark.ml.feature import StandardScaler
standardscaler=StandardScaler().setInputCol("features").setOutputCol("Scaled_features")
raw_data=standardscaler.fit(raw_data).transform(raw_data)
raw_data.select("features","Scaled_features").show(5)

`

+--------------------+--------------------+
|            features|     Scaled_features|
+--------------------+--------------------+
|[6.0,148.0,72.0,3...|[1.78063837321943...|
|[1.0,85.0,66.0,29...|[0.29677306220323...|
|[8.0,183.0,64.0,2...|[2.37418449762590...|
|[1.0,89.0,66.0,23...|[0.29677306220323...|
|[0.0,137.0,40.0,3...|[0.0,4.5012560836...|
+--------------------+--------------------+
only showing top 5 rows

Тренировка, тестовый сплит

Теперь, когда предварительная обработка данных завершена. Давайте разделим набор данных на набор для обучения и тестирования.

train, test = raw_data.randomSplit([0.8, 0.2], seed=12345)

Давайте проверим, есть ли у них дисбаланс в наборе данных

dataset_size=float(train.select("Outcome").count())
numPositives=train.select("Outcome").where('Outcome == 1').count()
per_ones=(float(numPositives)/float(dataset_size))*100
numNegatives=float(dataset_size-numPositives)
print('The number of ones are {}'.format(numPositives))
print('Percentage of ones are {}'.format(per_ones))

The number of ones are 206
Percentage of ones are 34.2762063228

Управление дисбалансом

Поскольку процент единиц в наборе данных составляет всего 34,27%, несомненно, это дисбаланс в наборе данных. К счастью, в случае логистической регрессии у нас есть метод, называемый взвешивание классов. Я рекомендую прочитать https://stackoverflow.com/questions/33372838/dealing-with-unbalanced-datasets-in-spark-mllib с целью понимания.

В нашем наборе данных (поезд) у нас 34,27% положительных и 65,73% отрицательных. Поскольку негативы в большинстве своем. Следовательно, целевая функция логистических потерь должна обрабатывать положительный класс (Результат == 1) с более высоким весом. Для этого мы рассчитываем BalancingRatio следующим образом:

BalancingRatio = numNegatives / dataset_size

Затем для каждого результата == 1 мы помещаем BalancingRatio в столбец «classWeights», а для каждого Outcome == 0 мы помещаем 1-BalancingRatio в столбец «classWeights».

Таким образом, мы присваиваем более высокий вес классу меньшинства (т.е. положительному классу).

BalancingRatio= numNegatives/dataset_size
print('BalancingRatio = {}'.format(BalancingRatio))

BalancingRatio = 0,657237936772

Теперь создаем новый столбец с именем «classWeights» в наборе данных «поезд».

train=train.withColumn("classWeights", when(train.Outcome == 1,BalancingRatio).otherwise(1-BalancingRatio))
train.select("classWeights").show(5)

Выбор функции

Мы используем ChiSqSelector, предоставляемый Spark ML, для выбора важных функций. Пожалуйста, обратитесь к моему предыдущему блогу для получения более подробной информации о работе ChiSqSelector.

# Feature selection using chisquareSelector
from pyspark.ml.feature import ChiSqSelector
css = ChiSqSelector(featuresCol='Scaled_features',outputCol='Aspect',labelCol='Outcome',fpr=0.05)
train=css.fit(train).transform(train)
test=css.fit(test).transform(test)
test.select("Aspect").show(5,truncate=False)

Построение модели классификации с использованием логистической регрессии (LR)

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(labelCol="Outcome", featuresCol="Aspect",weightCol="classWeights",maxIter=10)
model=lr.fit(train)
predict_train=model.transform(train)
predict_test=model.transform(test)
predict_test.select("Outcome","prediction").show(10)

Пример сгенерированных прогнозов выглядит следующим образом

+-------+----------+
|Outcome|prediction|
+-------+----------+
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      1|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       0.0|
|      0|       1.0|
+-------+----------+
only showing top 10 rows

Оценка модели

Теперь давайте оценим модель с помощью класса BinaryClassificationEvaluator в Spark ML. BinaryClassificationEvaluator по умолчанию использует areaUnderROC в качестве показателя производительности. Подробнее о areaUnderROC здесь

from pyspark.ml.evaluation import BinaryClassificationEvaluator
evaluator=BinaryClassificationEvaluator(rawPredictionCol=”rawPrediction”,labelCol=”Outcome”)
predict_test.select("Outcome","rawPrediction","prediction","probability").show(5)
print("The area under ROC for train set is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set is {}".format(evaluator.evaluate(predict_test)))

Результат выглядит следующим образом:

+-------+--------------------+----------+--------------------+
|Outcome|       rawPrediction|prediction|         probability|
+-------+--------------------+----------+--------------------+
|      0|[2.58888023648551...|       0.0|[0.93014249279728...|
|      0|[2.32098145085482...|       0.0|[0.91059987057264...|
|      0|[1.68081620680194...|       0.0|[0.84301258010142...|
|      0|[0.64946166218389...|       0.0|[0.65688913922505...|
|      0|[1.78997774283908...|       0.0|[0.85692454770533...|
+-------+--------------------+----------+--------------------+
only showing top 5 rows
The area under ROC for train set is 0.838687476957
The area under ROC for test set is 0.844700460829

Гиперпараметры

К этому моменту мы разработали модель классификации с использованием логистической регрессии. Однако работа логистической регрессии зависит от ряда параметров. На данный момент мы работали только с параметрами по умолчанию. Теперь давайте попробуем настроить гиперпараметры и посмотрим, изменится ли это.

Список настраиваемых параметров в LR

1) aggregationDepth: предлагаемая глубина для treeAggregate (›= 2). (по умолчанию: 2)

2) elasticNetParam: параметр смешивания ElasticNet в диапазоне [0, 1]. Если альфа = 0, штраф - это штраф L2. Для альфа = 1 это штраф L1. (по умолчанию: 0,0)

3) family: название семейства, которое является описанием распределения меток, которое будет использоваться в модели. Поддерживаемые параметры: авто, биномиальное, полиномиальное (по умолчанию: авто)

4) featuresCol: имя столбца функций. (по умолчанию: особенности, текущее: аспект)

5) fitIntercept: подходит ли член перехвата. (по умолчанию: True)

6) labelCol: имя столбца метки. (по умолчанию: метка, текущая: результат)

7) maxIter: максимальное количество итераций (›= 0). (по умолчанию: 100, текущая: 10)

8) predictionCol: имя столбца прогноза. (по умолчанию: прогноз)

9) вероятностьCol: имя столбца для прогнозируемых условных вероятностей класса. Примечание. Не все модели дают хорошо откалиброванные оценки вероятности! Эти вероятности следует рассматривать как достоверные, а не точные вероятности. (по умолчанию: вероятность)

10) rawPredictionCol: имя столбца необработанного прогноза (также известного как уверенность). (по умолчанию: rawPrediction)

11) regParam: параметр регуляризации (›= 0). (по умолчанию: 0,0)

12) стандартизация: нужно ли стандартизировать функции обучения перед подгонкой модели. (по умолчанию: True)

13) порог: порог предсказания двоичной классификации в диапазоне [0, 1].

14) Если заданы и порог, и пороги, они должны совпадать. если порог равен p, то пороги должны быть равны [1-p, p]. (по умолчанию: 0,5)

15) пороговые значения: пороговые значения в классификации нескольких классов для настройки вероятности предсказания каждого класса. Массив должен иметь длину, равную количеству классов, со значениями ›0, за исключением того, что не более одного значения может быть 0. Прогнозируется класс с наибольшим значением p / t, где p - исходная вероятность этого класса, а t - это порог класса. (неопределенный)

16) tol: допуск сходимости итерационных алгоритмов (›= 0). (по умолчанию: 1e-06)

17) weightCol: название столбца веса. Если он не установлен или не пуст, мы обрабатываем все веса экземпляра как 1.0. (current: classWeights)

Теперь давайте настроим некоторые из этих параметров и посмотрим, как они влияют на производительность алгоритма.

Для настройки гиперпараметров мы будем рассматривать следующие параметры:

1) aggregationDepth [2, 5, 10]

2) elasticNetParam [0,0, 0,5, 1,0]

3) fitIntercept [True / False]

4) максИтер [10, 100, 1000]

5) regParam [0,01, 0,5, 2,0]

Определим сетку параметров следующим образом:

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder()\
    .addGrid(lr.aggregationDepth,[2,5,10])\
    .addGrid(lr.elasticNetParam,[0.0, 0.5, 1.0])\
    .addGrid(lr.fitIntercept,[False, True])\
    .addGrid(lr.maxIter,[10, 100, 1000])\
    .addGrid(lr.regParam,[0.01, 0.5, 2.0]) \
    .build()

K-кратная перекрестная проверка

# Create 5-fold CrossValidator
cv = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=5)
# Run cross validations
cvModel = cv.fit(train)
# this will likely take a fair amount of time because of the amount of models that we're creating and testing
predict_train=cvModel.transform(train)
predict_test=cvModel.transform(test)
print("The area under ROC for train set after CV  is {}".format(evaluator.evaluate(predict_train)))
print("The area under ROC for test set after CV  is {}".format(evaluator.evaluate(predict_test)))

Результат выглядит следующим образом:

The area under ROC for train set after CV  is 0.843382081848
The area under ROC for test set after CV  is 0.846697388633

Для этой проблемы мы не увидели каких-либо значительных улучшений в метрике производительности после настройки гиперпараметров. Похоже, параметры по умолчанию хорошо сработали для этой проблемы. Однако настройка гиперпараметров является важным аспектом при решении проблем с машинным обучением, и ее нельзя игнорировать.

Будущая работа:

Осталось еще несколько важных вещей, о которых я не рассказал в этом блоге:

1) Обнаружение выбросов

2) Обработка дисбаланса

Методика взвешивания классов, которую мы использовали в этой работе, в настоящее время подходит только для логистической регрессии. Однако в случае других алгоритмов Random Forest, Naive Bayes, Support Vector Machine нам может потребоваться использовать такие методы, как Synthetic Minority Oversampling Technique (SMOTE).

Полезные ссылки

Https://spark.apache.org/docs/2.1.0/ml-tuning.html

Https://spark.apache.org/docs/2.2.0/mllib-evaluation-metrics.html

Https://docs.databricks.com/spark/latest/mllib/binary-classification-mllib-pipelines.html