Учебник по PySpark для двоичной классификации
Введение
В этом руководстве показано, как создавать и запускать модели бинарной классификации PySpark от начала до конца.
Используемый здесь набор данных — это набор данных Заболевания сердца из Репозитория машинного обучения UCI (Janosi et. al., 1988). Единственная инструкция/лицензионная информация об этом наборе данных — это указание авторов, если он используется в публикации. Это набор данных бинарной классификации. Мы будем использовать его сегодня для построения различных моделей классификации с помощью PySpark.
Недавно я опубликовал это руководство, чтобы показать, как подключить сеанс Jupyter Notebook с локального компьютера к автономному кластеру Apache Spark, размещенному на Linux.
Сегодня я также покажу вам, как настроить и подключиться к автономному кластеру на вашем локальном ноутбуке/компьютере Mac для тех, у кого нет доступа к виртуальной машине. Конечно, может быть проще запустить локальный сеанс PySpark на вашем Mac без настройки кластера. Но если вы хотите использовать свой Mac в качестве главного узла и добавить рабочие узлы, это покажет вам, как это сделать.
Настройка автономного кластера
- Перейдите в папку %SPARK_HOME% в окне терминала на вашем Mac и запустите:
./sbin/start-master.sh
2. В другом окне терминала перейдите в папку %SPARK_HOME% и выполните:
./sbin/start-worker.sh spark://ip:port
Вы можете использовать эту же стратегию для добавления любых рабочих узлов в вашей сети к главному узлу.
Примечание. Вы можете получить spark://ip:port, если перейдете по адресу http://localhost:8080 в своем браузере после запуска главного узла.
Настройка блокнота Jupyter
- В другом окне терминала на вашем Mac запустите:
jupyter notebook
Это должно открыть сеанс Jupyter Notebook в браузере.
Подключение Jupyter Notebook к кластеру Spark.
1. Откройте новое ядро Python 3 в Jupyter Notebook и запустите:
import pyspark import findspark from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext from pyspark.sql.types import * from pyspark import SparkConf, SparkContextfindspark.init('/path_to_spark/spark-3.1.2-bin-hadoop3.2') #The key here is putting the path to the spark download on your Mac VM.sc=pyspark.SparkContext(master='spark://ip:port',appName='Heart_Disease_Example') #Use the same 'spark://ip:port' from connecting the worker(s) to the master node. spark = SQLContext(sc) spark
Теперь мы подключены к нашему кластеру Spark. Приступаем к моделированию.
Моделирование
Данные
Наш набор данных — это набор данных Заболевания сердца из репозитория машинного обучения UCI (Dubois 2008). Скачать csv-файл можно здесь.
from pyspark.sql import functions as F from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler from pyspark.ml.feature import CountVectorizer from pyspark.ml.tuning import ParamGridBuilder from pyspark.ml.evaluation import BinaryClassificationEvaluator import numpy as np from pyspark.ml.tuning import CrossValidator import plotly.graph_objects as go df=spark.read.csv('heart.csv', inferSchema=True, header=True) df.count() len(df.columns)
Наш набор данных имеет 303 строки и 14 столбцов. Да, Spark не нужен для набора данных такого размера. Этот небольшой набор данных предназначен исключительно для простоты запуска примеров моделей.
df.dtypes
Все наши данные представляют собой целые/двойные числа, поэтому нам не нужно кодировать какие-либо категориальные переменные.
Информация о функциях:
1. age:возраст человека в годах.
2. пол:пол человека (1 = мужской, 0 = женский)
3. cp:испытываемая боль в груди (0 = типичная стенокардия, 1 = атипичная стенокардия, 2 = неангинозная боль, 3 = бессимптомная)
4. trestbps: артериальное давление человека в состоянии покоя (мм рт. ст. при поступлении в больницу).
5. chol:измерение холестерина человека в мг/дл.
6. fbs: уровень сахара в крови человека натощак (> 120 мг/дл, 1 = правда; 0 = ложь).
7. restecg: электрокардиографическое измерение в состоянии покоя (0 = нормальное, 1 = наличие аномалии ST-T, 2 = выявление вероятной или достоверной гипертрофии левого желудочка по критериям Эстеса)
8. Талах: максимальная частота сердечных сокращений человека, достигнутая
9. exang: стенокардия, вызванная физической нагрузкой (1 = да; 0 = нет)
10. oldpeak: депрессия ST, вызванная физической нагрузкой, по сравнению с состоянием покоя.
11. наклон: наклон сегмента ST пикового упражнения (0 = восходящий, 1 = плоский, 2 = нисходящий)
12. ca: количество крупных сосудов (0–4)
13. thal: заболевание крови, называемое талассемией (3 = норма, 6 = постоянный дефект, 7 = обратимый дефект)
14. цель:сердечно-сосудистые заболевания (0 = нет, 1 = да)
Проверить наличие отсутствующих значений:
from pyspark.sql.functions import col,sum df.select(*(sum(col(c).isNull().cast("int")).alias(c) for c in df.columns)).show()
Большой. Нет пропущенных значений, поэтому нам не нужно выполнять какие-либо методы вменения.
Сводка набора данных
df.describe().toPandas().transpose()
Круговая диаграмма целевой переменной:
df2=df.toPandas() df22=df2.groupby('target').count().reset_index()[['target','age']].rename(columns={'age':'counts'}) colors = ['gold', 'mediumturquoise', 'darkorange', 'lightgreen'] fig = go.Figure(data=[go.Pie(labels=df22.target, values=df22.counts)]) fig.update_traces(hoverinfo='label+percent', textinfo='value+percent', textfont_size=20, textfont_color='black', marker=dict(colors=colors, line=dict(color='#000000', width=2))) # fig.show() fig.update_layout(title='Heart Disease vs. Absence of Heart Disease', title_x=0.5)
Гистограммы переменных характеристик:
from plotly.subplots import make_subplots fig = make_subplots(rows=4, cols=4, start_cell="top-left", subplot_titles=df2.columns[:-1]) fig.add_trace(go.Histogram(x=df2.age, name='age'), row=1, col=1) fig.add_trace(go.Histogram(x=df2.sex, name='sex'), row=1, col=2) fig.add_trace(go.Histogram(x=df2.cp, name='cp'), row=1, col=3) fig.add_trace(go.Histogram(x=df2.trestbps, name='trestbps'), row=1, col=4) fig.add_trace(go.Histogram(x=df2.chol, name='chol'), row=2, col=1) fig.add_trace(go.Histogram(x=df2.fbs, name='fbs'), row=2, col=2) fig.add_trace(go.Histogram(x=df2.restecg, name='restecg'), row=2, col=3) fig.add_trace(go.Histogram(x=df2.thalach, name='thalach'), row=2, col=4) fig.add_trace(go.Histogram(x=df2.exang, name='exang'), row=3, col=1) fig.add_trace(go.Histogram(x=df2.oldpeak, name='oldpeak'), row=3, col=2) fig.add_trace(go.Histogram(x=df2.slope, name='slope'), row=3, col=3) fig.add_trace(go.Histogram(x=df2.thalach, name='ca'), row=3, col=4) fig.add_trace(go.Histogram(x=df2.thal, name='thal'), row=4, col=1) fig.update_layout(title='Histograms of Variables', title_x=0.5)
Oldpeak явно перекошен вправо. Чтобы исправить это, я мог бы выполнить преобразование журнала (x + 1). Поскольку я больше склоняюсь к использованию древовидной модели для этой проблемы, меня меньше волнует преобразование данных для нормального распределения. Это потому, что это не предположение, например, случайных лесов или повышения градиента. Но для чего-то вроде логистической регрессии нормальное распределение данных является предположением. Давайте продолжим и преобразим его в любом случае.
df3=df.withColumn('oldpeaklog', F.log(df['oldpeak']+1)) df33=df3.toPandas() fig = make_subplots(rows=1, cols=2, start_cell="top-left", subplot_titles=['oldpeak','oldpeaklog']) fig.add_trace(go.Histogram(x=df33.oldpeak, name='oldpeak'), row=1, col=1) fig.add_trace(go.Histogram(x=df33.oldpeaklog, name='oldpeaklog'), row=1, col=2) fig.update_layout(title='Transforming oldpeak', title_x=0.5
После преобразования он выглядит более нормально распределенным.
Тепловая карта корреляционной матрицы:
corr = df33.corr() fig = go.Figure(data=go.Heatmap(z=corr.values, x=corr.index.values, y=corr.columns.values, text=np.round(corr.values,2), texttemplate=”%{text}”)) fig.update_layout(title=dict(text=’Correlation Matrix Heatmap’,font=dict(size=20), x=0.5))
Самая высокая корреляция составляет -0,59 между уклоном и старым пиком или -0,58 между наклоном и старым пиком. Все остальное ниже 0,5 или выше -0,5, т.е. ближе к 0. Таким образом, я решил оставить все переменные в моделях.
Инициализировать этапы
#Initialize stages stages = [] #Target column label_stringIdx = StringIndexer(inputCol = 'target', outputCol = 'label') stages += [label_stringIdx] #Numeric Columns numericCols = ['age', 'sex', 'cp', 'trestbps', 'chol', 'fbs', 'restecg', 'thalach', 'exang', 'slope', 'ca', 'thal', 'oldpeaklog'] #Create a vector assembler assemblerInputs = numericCols assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features").setHandleInvalid('keep') stages += [assembler]
Настроить воронку
from pyspark.ml import Pipeline pipeline = Pipeline(stages = stages) pipelineModel = pipeline.fit(df3) df3 = pipelineModel.transform(df3) selectedCols = ['label', 'features'] + ['age', 'sex', 'cp', 'trestbps', 'chol', 'fbs', 'restecg', 'thalach', 'exang', 'slope', 'ca', 'thal', 'oldpeaklog','target'] df3 = df3.select(selectedCols) df3.printSchema()
Разделить на обучение и тестирование
train, test = df3.randomSplit([0.7, 0.3], seed = 2018) train.groupby('target').count().show() test.groupby('target').count().show()
Классы выглядят относительно сбалансированными как для обучения, так и для теста. Таким образом, нам не нужно делать какую-либо балансировку классов. Давайте сделаем несколько моделей.
Модели
Случайные леса
from pyspark.ml.classification import RandomForestClassifier rf = RandomForestClassifier(featuresCol = 'features', labelCol = 'label', seed=101) rfModel = rf.fit(train) predictions_rf=rfModel.transform(test)
Общая точность модели
from pyspark.ml.evaluation import MulticlassClassificationEvaluator evaluator_rf = MulticlassClassificationEvaluator(predictionCol=”prediction”) evaluator_rf.evaluate(predictions_rf)
Матрица путаницы
predictions_rf.crosstab('label','prediction').show()
ROC и кривые точного отзыва
from handyspark import * # Creates instance of extended version of BinaryClassificationMetrics # using a DataFrame and its probability and label columns, as the output # from the classifier bcm = BinaryClassificationMetrics(predictions_rf, scoreCol='probability', labelCol='label') # Get metrics from evaluator print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC)) print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR)) # Plot both ROC and PR curves fig, axs = plt.subplots(1, 2, figsize=(12, 4)) bcm.plot_roc_curve(ax=axs[0]) bcm.plot_pr_curve(ax=axs[1])
Тестирование различных пороговых значений
Вот функция для получения матрицы путаницы на основе любого введенного порога.
split1_udf = F.udf(lambda value: value[0].item(), FloatType()) split2_udf = F.udf(lambda value: value[1].item(), FloatType()) def test_threshold(model, prob): output2 = model.select('rawPrediction','target','probability',split1_udf('probability').alias('class_0'), split2_udf('probability').alias('class_1')) from pyspark.sql.functions import col, when output2=output2.withColumn('prediction', when(col('class_0')> prob, 1).otherwise(0)) output2.crosstab('prediction','target').show()
Используя эту пользовательскую функцию test_threshold, вы можете просмотреть матрицы путаницы в зависимости от того, хотите ли вы более высокую точность или более высокий отзыв и т. д.
test_threshold(predictions_rf,.6)
test_threshold(predictions_rf,.7)
Важность функции
feat_imps=rfModel.featureImportances x_values = list(range(len(feat_imps))) plt.bar(x_values, feat_imps, orientation = 'vertical') plt.xticks(x_values, ['age','sex','cp','trestbps','chol','fbs','restecg','thalach','exang','slope','ca','thal','oldpeaklog'], rotation=40) plt.ylabel('Importance') plt.xlabel('Feature') plt.title('Feature Importances')
Используя это, мы можем видеть, что наиболее важными переменными для прогноза являются cp, thalach, ca и oldpeaklog, именно в таком порядке.
Настроить гиперпараметры
paramGrid_rf = ParamGridBuilder() \ .addGrid(rf.numTrees, [int(x) for x in np.arange(200,221,10)]) \ .addGrid(rf.maxDepth, [int(x) for x in np.arange(10,11,10)]) \ .addGrid(rf.featureSubsetStrategy, [x for x in ["sqrt", "log2", "onethird"]]) \ .addGrid(rf.impurity, [x for x in ['gini','entropy']]) \ .addGrid(rf.maxBins, [int(x) for x in np.arange(22, 42, 10)]) \ .build() evaluator = BinaryClassificationEvaluator() rf_crossval = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid_rf, evaluator=evaluator, numFolds=3) rf_cvModel = rf_crossval.fit(train) predictions_rf_cv = rf_cvModel.transform(test)
Общая точность лучшей модели резюме
evaluator_rf_cv = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator_rf_cv.evaluate(predictions_rf_cv)
Стало лучше! Повышение точности с 79,6% до 82,8%. Какие функции были наиболее важными и какие гиперпараметры использовались в лучшей модели?
Важность характеристик лучшей модели
import matplotlib.pyplot as plt feat_imps=rf_cvModel.bestModel.featureImportances x_values = list(range(len(feat_imps))) plt.bar(x_values, feat_imps, orientation = 'vertical') plt.xticks(x_values, ['age','sex','cp','trestbps','chol','fbs','restecg','thalach','exang','slope','ca','thal','oldpeaklog'], rotation=40) plt.ylabel('Importance') plt.xlabel('Feature') plt.title('Feature Importances')
Получить значения гиперпараметров лучшей CV-модели случайных лесов
print('Num Trees: ' + str(rf_cvModel.bestModel.getNumTrees)) print('Max Depth: ' + str(rf_cvModel.bestModel.getMaxDepth())) print('Feature Subset Strategy: ' + str(rf_cvModel.bestModel.getFeatureSubsetStrategy())) print('Impurity: ' + str(rf_cvModel.bestModel.getImpurity())) print('Max Bins: ' + str(rf_cvModel.bestModel.getMaxBins()))
Логистическая регрессия
from pyspark.ml.classification import LogisticRegression lr = LogisticRegression(maxIter=20, regParam=0.3, elasticNetParam=0,featuresCol = 'features', labelCol = 'label') lrModel = lr.fit(train) predictions_lr = lrModel.transform(test)
Общая точность модели
evaluator_lr = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator_lr.evaluate(predictions_lr)
Распечатать коэффициенты и перехват для логистической регрессии
# Print the coefficients and intercept for logistic regression print("Coefficients: " + str(lrModel.coefficients)) print("Intercept: " + str(lrModel.intercept))
Матрица путаницы
predictions_lr.crosstab('label','prediction').show()
Кривые ROC и PR
# Creates instance of extended version of BinaryClassificationMetrics # using a DataFrame and its probability and label columns, as the output # from the classifier bcm = BinaryClassificationMetrics(predictions_lr, scoreCol='probability', labelCol='label') # Get metrics from evaluator print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC)) print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR)) # Plot both ROC and PR curves fig, axs = plt.subplots(1, 2, figsize=(12, 4)) bcm.plot_roc_curve(ax=axs[0]) bcm.plot_pr_curve(ax=axs[1])
Настроить гиперпараметры
paramGrid_lr = ParamGridBuilder() \ .addGrid(lr.maxIter, [int(x) for x in np.arange(10,30,10)]) \ .addGrid(lr.regParam, [int(x) for x in np.arange(.1,.5,.1)]) \ .addGrid(lr.elasticNetParam, [int(x) for x in np.arange(0,.2,.1)]) \ .build() evaluator = BinaryClassificationEvaluator() lr_crossval = CrossValidator(estimator=lr, estimatorParamMaps=paramGrid_lr, evaluator=evaluator, numFolds=3) lr_cvModel = lr_crossval.fit(train) predictions_lr_cv = lr_cvModel.transform(test)
Общая точность лучшей модели резюме
evaluator_lr_cv = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator_lr_cv.evaluate(predictions_lr_cv)
Похоже, точность стала лучше! Рост с 78,2% до 81,8%.
Логистическая регрессия, версия 2
Я нигде не смог найти в документах по логистической регрессии выше информацию о том, как получить коэффициенты вместе с P-значениями. Итак, я нашел эту отдельную модель логистической регрессии в пакете pyspark.ml.regression. Я сделал это, потому что, на мой взгляд, без p-значений коэффициенты бесполезны.
from pyspark.ml.regression import GeneralizedLinearRegression glr = GeneralizedLinearRegression(family="binomial", link="logit", maxIter=10, regParam=0.0) model = glr.fit(train) summary = model.summary print('Variables:' + str(train.columns[2:-1])) print("Coefficient Standard Errors: " + str(summary.coefficientStandardErrors)) print("T Values: " + str(summary.tValues)) print("P Values: " + str(summary.pValues))
Используя эти исходные документы, вы можете затем получить имена гиперпараметров и повторить шаги, описанные выше, для поиска по сетке и точности. Однако для этого примера я не буду повторять эти шаги и оставлю это вам.
Наивный байесовский анализ
from pyspark.ml.classification import NaiveBayes nb = NaiveBayes(featuresCol = 'features', labelCol = 'label') nb_model = nb.fit(train) predictions_nb=nb_model.transform(test)
Общая точность модели
evaluator_nb = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator_nb.evaluate(predictions_nb)
Матрица путаницы
predictions_nb.crosstab('label','prediction').show()
Кривые ROC и PR
from handyspark import * from matplotlib import pyplot as plt %matplotlib inline # Creates instance of extended version of BinaryClassificationMetrics # using a DataFrame and its probability and label columns, as the output # from the classifier bcm = BinaryClassificationMetrics(predictions_nb, scoreCol='probability', labelCol='label') # Get metrics from evaluator print("Area under ROC Curve: {:.4f}".format(bcm.areaUnderROC)) print("Area under PR Curve: {:.4f}".format(bcm.areaUnderPR)) # Plot both ROC and PR curves fig, axs = plt.subplots(1, 2, figsize=(12, 4)) bcm.plot_roc_curve(ax=axs[0]) bcm.plot_pr_curve(ax=axs[1])
Настроить гиперпараметры
paramGrid_nb = ParamGridBuilder() \ .addGrid(nb.smoothing, [int(x) for x in np.arange(1,10,1)]) \ .build() evaluator = BinaryClassificationEvaluator() nb_crossval = CrossValidator(estimator=nb, estimatorParamMaps=paramGrid_nb, evaluator=evaluator, numFolds=3) nb_cvModel = nb_crossval.fit(train) predictions_nb_cv = nb_cvModel.transform(test)
Оцените лучшую модель резюме
evaluator_nb_cv = MulticlassClassificationEvaluator(predictionCol="prediction") evaluator_nb_cv.evaluate(predictions_nb_cv)
Заключение
Похоже, что случайные леса показали себя здесь лучше всего с точностью 82,8% после поиска по сетке. Логистическая регрессия была второй с 81,8%, а наивный байесовский анализ занял последнее место с 69,8%.
Давайте интерпретируем наиболее важные переменные для случайных лесов, поскольку они имеют наибольшую точность предсказания. Глядя на cp, thalach и ca:
cp:боль в груди (значение 1: типичная стенокардия, значение 2: атипичная стенокардия, значение 3: неангинозная боль, значение 4: бессимптомное течение)
df3.groupby('target','cp').count().show()
Random Forests сообщает, что большинство из тех, кто классифицируется как больной сердечным заболеванием, имеют cp=1/атипичная стенокардия и cp=2/не стенокардия, в то время как большинство тех, кто не классифицируется как больной сердечным заболеванием, имеют cp= 0/типичная стенокардия.
thalach:максимальная частота сердечных сокращений человека, достигнутая
df34=df33.loc[df33['target']==0,'thalach'] df35=df33.loc[df33['target']==1,'thalach'] fig = go.Figure() fig.add_trace(go.Histogram(x=df34, name='target:0')) fig.add_trace(go.Histogram(x=df35,name='target:1')) # Overlay both histograms fig.update_layout(barmode='overlay') # Reduce opacity to see both histograms fig.update_traces(opacity=.9) fig.update_xaxes(title='Thalach Level') fig.update_yaxes(dtick=5, range=[0,30], title='Count') fig.update_layout(title='Comparing Thalach Levels', title_x=0.5)
Согласно этому графику, более высокие уровни Талаха или максимальной частоты сердечных сокращений важны для классификации наличия болезни сердца по сравнению с более низкими уровнями для классификации как отсутствие болезни сердца.
ca: количество крупных сосудов (0–4).
df3.groupby('target','ca').count().show()
Random Forests сообщает, что у большинства из тех, кто классифицируется как больной сердечным заболеванием, нет крупных сосудов, в то время как у большинства тех, кто классифицируется как не страдающий сердечным заболеванием, больше крупных сосудов, то есть >0.
Распределение этих переменных кажется интуитивно понятным. Кроме того, мы смогли вывести это из наших моделей!
Надеюсь, этот пример моделей PySpark поможет вам в работе. Спасибо за чтение. Не стесняйтесь оставлять любые комментарии.
Ссылки:
Дюбуа, Кристофер Л. и Смит, П. (2008). Репозиторий сетевых данных UCI [http://networkdata.ics.uci.edu]. Ирвин, Калифорния: Калифорнийский университет, Школа информационных и компьютерных наук.
Яноши, Андрас, Стейнбрунн, Уильям, Пфистерер, Матиас, Детрано, Роберт и доктор медицины, доктор медицины (1988). Сердечное заболевание. Репозиторий машинного обучения UCI.