Учебник по PySpark для двоичной классификации

Введение

В этом руководстве показано, как создавать и запускать модели бинарной классификации PySpark от начала до конца.

Используемый здесь набор данных — это набор данных Заболевания сердца из Репозитория машинного обучения UCI (Janosi et. al., 1988). Единственная инструкция/лицензионная информация об этом наборе данных — это указание авторов, если он используется в публикации. Это набор данных бинарной классификации. Мы будем использовать его сегодня для построения различных моделей классификации с помощью PySpark.

Недавно я опубликовал это руководство, чтобы показать, как подключить сеанс Jupyter Notebook с локального компьютера к автономному кластеру Apache Spark, размещенному на Linux.

Сегодня я также покажу вам, как настроить и подключиться к автономному кластеру на вашем локальном ноутбуке/компьютере Mac для тех, у кого нет доступа к виртуальной машине. Конечно, может быть проще запустить локальный сеанс PySpark на вашем Mac без настройки кластера. Но если вы хотите использовать свой Mac в качестве главного узла и добавить рабочие узлы, это покажет вам, как это сделать.

Настройка автономного кластера

  1. Перейдите в папку %SPARK_HOME% в окне терминала на вашем Mac и запустите:
./sbin/start-master.sh

2. В другом окне терминала перейдите в папку %SPARK_HOME% и выполните:

./sbin/start-worker.sh spark://ip:port

Вы можете использовать эту же стратегию для добавления любых рабочих узлов в вашей сети к главному узлу.

Примечание. Вы можете получить spark://ip:port, если перейдете по адресу http://localhost:8080 в своем браузере после запуска главного узла.

Настройка блокнота Jupyter

  1. В другом окне терминала на вашем Mac запустите:
jupyter notebook 

Это должно открыть сеанс Jupyter Notebook в браузере.

Подключение Jupyter Notebook к кластеру Spark.

1. Откройте новое ядро ​​Python 3 в Jupyter Notebook и запустите:

import pyspark
import findspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark import SparkConf, SparkContextfindspark.init('/path_to_spark/spark-3.1.2-bin-hadoop3.2') 
#The key here is putting the path to the spark download on your Mac
VM.sc=pyspark.SparkContext(master='spark://ip:port',appName='Heart_Disease_Example')
#Use the same 'spark://ip:port' from connecting the worker(s) to the master node. 
spark = SQLContext(sc)
spark

Теперь мы подключены к нашему кластеру Spark. Приступаем к моделированию.

Моделирование

Данные

Наш набор данных — это набор данных Заболевания сердца из репозитория машинного обучения UCI (Dubois 2008). Скачать csv-файл можно здесь.

from pyspark.sql import functions as F
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator
import numpy as np
from pyspark.ml.tuning import CrossValidator
import plotly.graph_objects as go
df=spark.read.csv('heart.csv', inferSchema=True, header=True)
df.count()
len(df.columns)

Наш набор данных имеет 303 строки и 14 столбцов. Да, Spark не нужен для набора данных такого размера. Этот небольшой набор данных предназначен исключительно для простоты запуска примеров моделей.

df.dtypes

Все наши данные представляют собой целые/двойные числа, поэтому нам не нужно кодировать какие-либо категориальные переменные.

Информация о функциях:

1. age:возраст человека в годах.

2. пол:пол человека (1 = мужской, 0 = женский)

3. cp:испытываемая боль в груди (0 = типичная стенокардия, 1 = атипичная стенокардия, 2 = неангинозная боль, 3 = бессимптомная)

4. trestbps: артериальное давление человека в состоянии покоя (мм рт. ст. при поступлении в больницу).

5. chol:измерение холестерина человека в мг/дл.

6. fbs: уровень сахара в крови человека натощак (> 120 мг/дл, 1 = правда; 0 = ложь).

7. restecg: электрокардиографическое измерение в состоянии покоя (0 = нормальное, 1 = наличие аномалии ST-T, 2 = выявление вероятной или достоверной гипертрофии левого желудочка по критериям Эстеса)

8. Талах: максимальная частота сердечных сокращений человека, достигнутая

9. exang: стенокардия, вызванная физической нагрузкой (1 = да; 0 = нет)

10. oldpeak: депрессия ST, вызванная физической нагрузкой, по сравнению с состоянием покоя.

11. наклон: наклон сегмента ST пикового упражнения (0 = восходящий, 1 = плоский, 2 = нисходящий)

12. ca: количество крупных сосудов (0–4)

13. thal: заболевание крови, называемое талассемией (3 = норма, 6 = постоянный дефект, 7 = обратимый дефект)

14. цель:сердечно-сосудистые заболевания (0 = нет, 1 = да)

Проверить наличие отсутствующих значений:

from pyspark.sql.functions import col,sum
df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()

Большой. Нет пропущенных значений, поэтому нам не нужно выполнять какие-либо методы вменения.

Сводка набора данных

df.describe().toPandas().transpose()

Круговая диаграмма целевой переменной:

df2=df.toPandas()
df22=df2.groupby('target').count().reset_index()[['target','age']].rename(columns={'age':'counts'})
colors = ['gold', 'mediumturquoise', 'darkorange', 'lightgreen']
fig = go.Figure(data=[go.Pie(labels=df22.target,
                             values=df22.counts)])
fig.update_traces(hoverinfo='label+percent', textinfo='value+percent', textfont_size=20, textfont_color='black',
                  marker=dict(colors=colors, line=dict(color='#000000', width=2)))
# fig.show()
fig.update_layout(title='Heart Disease vs. Absence of Heart Disease', title_x=0.5)

Гистограммы переменных характеристик:

from plotly.subplots import make_subplots
fig = make_subplots(rows=4, cols=4, start_cell="top-left",
                   subplot_titles=df2.columns[:-1])
fig.add_trace(go.Histogram(x=df2.age, name='age'),
              row=1, col=1)
fig.add_trace(go.Histogram(x=df2.sex, name='sex'),
              row=1, col=2)
fig.add_trace(go.Histogram(x=df2.cp, name='cp'),
              row=1, col=3)
fig.add_trace(go.Histogram(x=df2.trestbps, name='trestbps'),
              row=1, col=4)
fig.add_trace(go.Histogram(x=df2.chol, name='chol'),
              row=2, col=1)
fig.add_trace(go.Histogram(x=df2.fbs, name='fbs'),
              row=2, col=2)
fig.add_trace(go.Histogram(x=df2.restecg, name='restecg'),
              row=2, col=3)
fig.add_trace(go.Histogram(x=df2.thalach, name='thalach'),
              row=2, col=4)
fig.add_trace(go.Histogram(x=df2.exang, name='exang'),
              row=3, col=1)
fig.add_trace(go.Histogram(x=df2.oldpeak, name='oldpeak'),
              row=3, col=2)
fig.add_trace(go.Histogram(x=df2.slope, name='slope'),
              row=3, col=3)
fig.add_trace(go.Histogram(x=df2.thalach, name='ca'),
              row=3, col=4)
fig.add_trace(go.Histogram(x=df2.thal, name='thal'),
              row=4, col=1)
fig.update_layout(title='Histograms of Variables', title_x=0.5)

Oldpeak явно перекошен вправо. Чтобы исправить это, я мог бы выполнить преобразование журнала (x + 1). Поскольку я больше склоняюсь к использованию древовидной модели для этой проблемы, меня меньше волнует преобразование данных для нормального распределения. Это потому, что это не предположение, например, случайных лесов или повышения градиента. Но для чего-то вроде логистической регрессии нормальное распределение данных является предположением. Давайте продолжим и преобразим его в любом случае.

df3=df.withColumn('oldpeaklog', F.log(df['oldpeak']+1))
df33=df3.toPandas()
fig = make_subplots(rows=1, cols=2, start_cell="top-left",
                   subplot_titles=['oldpeak','oldpeaklog'])
fig.add_trace(go.Histogram(x=df33.oldpeak, name='oldpeak'),
              row=1, col=1)
fig.add_trace(go.Histogram(x=df33.oldpeaklog, name='oldpeaklog'),
              row=1, col=2)
fig.update_layout(title='Transforming oldpeak', title_x=0.5

После преобразования он выглядит более нормально распределенным.

Тепловая карта корреляционной матрицы:

corr = df33.corr()
fig = go.Figure(data=go.Heatmap(z=corr.values,
 x=corr.index.values,
 y=corr.columns.values,
 text=np.round(corr.values,2),
 texttemplate=”%{text}”))
fig.update_layout(title=dict(text=’Correlation Matrix Heatmap’,font=dict(size=20), x=0.5))

Самая высокая корреляция составляет -0,59 между уклоном и старым пиком или -0,58 между наклоном и старым пиком. Все остальное ниже 0,5 или выше -0,5, т.е. ближе к 0. Таким образом, я решил оставить все переменные в моделях.

Инициализировать этапы

#Initialize stages
stages = []
#Target column
label_stringIdx = StringIndexer(inputCol = 'target', outputCol = 'label')
stages += [label_stringIdx]
#Numeric Columns
numericCols = ['age',
 'sex',
 'cp',
 'trestbps',
 'chol',
 'fbs',
 'restecg',
 'thalach',
 'exang',
 'slope',
 'ca',
 'thal',
 'oldpeaklog'] 
#Create a vector assembler
assemblerInputs = numericCols 
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid('keep')
stages += [assembler]

Настроить воронку

from pyspark.ml import Pipeline
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df3)
df3 = pipelineModel.transform(df3)
selectedCols = ['label', 'features'] + 
['age',
 'sex',
 'cp',
 'trestbps',
 'chol',
 'fbs',
 'restecg',
 'thalach',
 'exang',
 'slope',
 'ca',
 'thal',
 'oldpeaklog','target']
df3 = df3.select(selectedCols)
df3.printSchema()

Разделить на обучение и тестирование

train, test = df3.randomSplit([0.7, 0.3], seed = 2018)
train.groupby('target').count().show()
test.groupby('target').count().show()

Классы выглядят относительно сбалансированными как для обучения, так и для теста. Таким образом, нам не нужно делать какую-либо балансировку классов. Давайте сделаем несколько моделей.

Модели

Случайные леса

from pyspark.ml.classification import RandomForestClassifier
rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', seed=101)
rfModel = rf.fit(train)
predictions_rf=rfModel.transform(test)

Общая точность модели

from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator_rf = MulticlassClassificationEvaluator(predictionCol=”prediction”)
evaluator_rf.evaluate(predictions_rf)

Матрица путаницы

predictions_rf.crosstab('label','prediction').show()

ROC и кривые точного отзыва

from handyspark import *
# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output
# from the classifier
bcm = BinaryClassificationMetrics(predictions_rf, scoreCol='probability', labelCol='label')
# Get metrics from evaluator
print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC))
print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR))
# Plot both ROC and PR curves
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm.plot_roc_curve(ax=axs[0])
bcm.plot_pr_curve(ax=axs[1])

Тестирование различных пороговых значений

Вот функция для получения матрицы путаницы на основе любого введенного порога.

split1_udf = F.udf(lambda value: value[0].item(), FloatType())
split2_udf = F.udf(lambda value: value[1].item(), FloatType())
def test_threshold(model, prob):
    output2 = model.select('rawPrediction','target','probability',split1_udf('probability').alias('class_0'), split2_udf('probability').alias('class_1'))
    from pyspark.sql.functions import col, when
    output2=output2.withColumn('prediction', when(col('class_0')> prob, 1).otherwise(0))
    output2.crosstab('prediction','target').show()

Используя эту пользовательскую функцию test_threshold, вы можете просмотреть матрицы путаницы в зависимости от того, хотите ли вы более высокую точность или более высокий отзыв и т. д.

test_threshold(predictions_rf,.6)

test_threshold(predictions_rf,.7)

Важность функции

feat_imps=rfModel.featureImportances
x_values = list(range(len(feat_imps)))
plt.bar(x_values, feat_imps, orientation = 'vertical')
plt.xticks(x_values, ['age','sex','cp','trestbps','chol','fbs','restecg','thalach','exang','slope','ca','thal','oldpeaklog'], rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')

Используя это, мы можем видеть, что наиболее важными переменными для прогноза являются cp, thalach, ca и oldpeaklog, именно в таком порядке.

Настроить гиперпараметры

paramGrid_rf = ParamGridBuilder() \
    .addGrid(rf.numTrees, [int(x) for x in np.arange(200,221,10)]) \
    .addGrid(rf.maxDepth, [int(x) for x in np.arange(10,11,10)]) \
    .addGrid(rf.featureSubsetStrategy, [x for x in ["sqrt", "log2", "onethird"]]) \
    .addGrid(rf.impurity, [x for x in ['gini','entropy']]) \
    .addGrid(rf.maxBins, [int(x) for x in np.arange(22, 42, 10)]) \
    .build()
evaluator = BinaryClassificationEvaluator()
rf_crossval = CrossValidator(estimator=rf,
                          estimatorParamMaps=paramGrid_rf,
                          evaluator=evaluator,
                          numFolds=3)
rf_cvModel = rf_crossval.fit(train)
predictions_rf_cv = rf_cvModel.transform(test)

Общая точность лучшей модели резюме

evaluator_rf_cv = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_rf_cv.evaluate(predictions_rf_cv)

Стало лучше! Повышение точности с 79,6% до 82,8%. Какие функции были наиболее важными и какие гиперпараметры использовались в лучшей модели?

Важность характеристик лучшей модели

import matplotlib.pyplot as plt
feat_imps=rf_cvModel.bestModel.featureImportances
x_values = list(range(len(feat_imps)))
plt.bar(x_values, feat_imps, orientation = 'vertical')
plt.xticks(x_values, ['age','sex','cp','trestbps','chol','fbs','restecg','thalach','exang','slope','ca','thal','oldpeaklog'], rotation=40)
plt.ylabel('Importance')
plt.xlabel('Feature')
plt.title('Feature Importances')

Получить значения гиперпараметров лучшей CV-модели случайных лесов

print('Num Trees: ' + str(rf_cvModel.bestModel.getNumTrees))
print('Max Depth: ' + str(rf_cvModel.bestModel.getMaxDepth()))
print('Feature Subset Strategy: ' + str(rf_cvModel.bestModel.getFeatureSubsetStrategy()))
print('Impurity: ' + str(rf_cvModel.bestModel.getImpurity()))
print('Max Bins: ' + str(rf_cvModel.bestModel.getMaxBins()))

Логистическая регрессия

from pyspark.ml.classification import LogisticRegression
lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0,featuresCol = 'features', labelCol = 'label')
lrModel = lr.fit(train)
predictions_lr = lrModel.transform(test)

Общая точность модели

evaluator_lr = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_lr.evaluate(predictions_lr)

Распечатать коэффициенты и перехват для логистической регрессии

# Print the coefficients and intercept for logistic regression
print("Coefficients: " + str(lrModel.coefficients))
print("Intercept: " + str(lrModel.intercept))

Матрица путаницы

predictions_lr.crosstab('label','prediction').show()

Кривые ROC и PR

# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output
# from the classifier
bcm = BinaryClassificationMetrics(predictions_lr, scoreCol='probability', labelCol='label')
# Get metrics from evaluator
print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC))
print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR))
# Plot both ROC and PR curves
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm.plot_roc_curve(ax=axs[0])
bcm.plot_pr_curve(ax=axs[1])

Настроить гиперпараметры

paramGrid_lr = ParamGridBuilder() \
    .addGrid(lr.maxIter, [int(x) for x in np.arange(10,30,10)]) \
    .addGrid(lr.regParam, [int(x) for x in np.arange(.1,.5,.1)]) \
    .addGrid(lr.elasticNetParam, [int(x) for x in np.arange(0,.2,.1)]) \
    .build()
evaluator = BinaryClassificationEvaluator()
lr_crossval = CrossValidator(estimator=lr,
                          estimatorParamMaps=paramGrid_lr,
                          evaluator=evaluator,
                          numFolds=3)
lr_cvModel = lr_crossval.fit(train)
predictions_lr_cv = lr_cvModel.transform(test)

Общая точность лучшей модели резюме

evaluator_lr_cv = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_lr_cv.evaluate(predictions_lr_cv)

Похоже, точность стала лучше! Рост с 78,2% до 81,8%.

Логистическая регрессия, версия 2

Я нигде не смог найти в документах по логистической регрессии выше информацию о том, как получить коэффициенты вместе с P-значениями. Итак, я нашел эту отдельную модель логистической регрессии в пакете pyspark.ml.regression. Я сделал это, потому что, на мой взгляд, без p-значений коэффициенты бесполезны.

from pyspark.ml.regression import GeneralizedLinearRegression
glr = GeneralizedLinearRegression(family="binomial", link="logit", maxIter=10, 
regParam=0.0)
model = glr.fit(train)
summary = model.summary
print('Variables:' + str(train.columns[2:-1]))
print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors))
print("T Values: " + str(summary.tValues))
print("P Values: " + str(summary.pValues))

Используя эти исходные документы, вы можете затем получить имена гиперпараметров и повторить шаги, описанные выше, для поиска по сетке и точности. Однако для этого примера я не буду повторять эти шаги и оставлю это вам.

Наивный байесовский анализ

from pyspark.ml.classification import NaiveBayes
nb = NaiveBayes(featuresCol = 'features', labelCol = 'label')
nb_model = nb.fit(train)
predictions_nb=nb_model.transform(test)

Общая точность модели

evaluator_nb = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_nb.evaluate(predictions_nb)

Матрица путаницы

predictions_nb.crosstab('label','prediction').show()

Кривые ROC и PR

from handyspark import *
from matplotlib import pyplot as plt
%matplotlib inline
# Creates instance of extended version of BinaryClassificationMetrics
# using a DataFrame and its probability and label columns, as the output
# from the classifier
bcm = BinaryClassificationMetrics(predictions_nb, scoreCol='probability', labelCol='label')
# Get metrics from evaluator
print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC))
print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR))
# Plot both ROC and PR curves
fig, axs = plt.subplots(1, 2, figsize=(12, 4))
bcm.plot_roc_curve(ax=axs[0])
bcm.plot_pr_curve(ax=axs[1])

Настроить гиперпараметры

paramGrid_nb = ParamGridBuilder() \
    .addGrid(nb.smoothing, [int(x) for x in np.arange(1,10,1)]) \
    .build()
evaluator = BinaryClassificationEvaluator()
nb_crossval = CrossValidator(estimator=nb,
                          estimatorParamMaps=paramGrid_nb,
                          evaluator=evaluator,
                          numFolds=3)
nb_cvModel = nb_crossval.fit(train)
predictions_nb_cv = nb_cvModel.transform(test)

Оцените лучшую модель резюме

evaluator_nb_cv = MulticlassClassificationEvaluator(predictionCol="prediction")
evaluator_nb_cv.evaluate(predictions_nb_cv)

Заключение

Похоже, что случайные леса показали себя здесь лучше всего с точностью 82,8% после поиска по сетке. Логистическая регрессия была второй с 81,8%, а наивный байесовский анализ занял последнее место с 69,8%.

Давайте интерпретируем наиболее важные переменные для случайных лесов, поскольку они имеют наибольшую точность предсказания. Глядя на cp, thalach и ca:

cp:боль в груди (значение 1: типичная стенокардия, значение 2: атипичная стенокардия, значение 3: неангинозная боль, значение 4: бессимптомное течение)

df3.groupby('target','cp').count().show()

Random Forests сообщает, что большинство из тех, кто классифицируется как больной сердечным заболеванием, имеют cp=1/атипичная стенокардия и cp=2/не стенокардия, в то время как большинство тех, кто не классифицируется как больной сердечным заболеванием, имеют cp= 0/типичная стенокардия.

thalach:максимальная частота сердечных сокращений человека, достигнутая

df34=df33.loc[df33['target']==0,'thalach']
df35=df33.loc[df33['target']==1,'thalach']
fig = go.Figure()
fig.add_trace(go.Histogram(x=df34, name='target:0'))
fig.add_trace(go.Histogram(x=df35,name='target:1'))
# Overlay both histograms
fig.update_layout(barmode='overlay')
# Reduce opacity to see both histograms
fig.update_traces(opacity=.9)
fig.update_xaxes(title='Thalach Level')
fig.update_yaxes(dtick=5, range=[0,30], title='Count')
fig.update_layout(title='Comparing Thalach Levels', title_x=0.5)

Согласно этому графику, более высокие уровни Талаха или максимальной частоты сердечных сокращений важны для классификации наличия болезни сердца по сравнению с более низкими уровнями для классификации как отсутствие болезни сердца.

ca: количество крупных сосудов (0–4).

df3.groupby('target','ca').count().show()

Random Forests сообщает, что у большинства из тех, кто классифицируется как больной сердечным заболеванием, нет крупных сосудов, в то время как у большинства тех, кто классифицируется как не страдающий сердечным заболеванием, больше крупных сосудов, то есть >0.

Распределение этих переменных кажется интуитивно понятным. Кроме того, мы смогли вывести это из наших моделей!

Надеюсь, этот пример моделей PySpark поможет вам в работе. Спасибо за чтение. Не стесняйтесь оставлять любые комментарии.

Ссылки:

Дюбуа, Кристофер Л. и Смит, П. (2008). Репозиторий сетевых данных UCI [http://networkdata.ics.uci.edu]. Ирвин, Калифорния: Калифорнийский университет, Школа информационных и компьютерных наук.

Яноши, Андрас, Стейнбрунн, Уильям, Пфистерер, Матиас, Детрано, Роберт и доктор медицины, доктор медицины (1988). Сердечное заболевание. Репозиторий машинного обучения UCI.