Почему XGBoost?

XGBoost (eXtreme Gradient Boosting) — один из самых популярных и широко используемых алгоритмов машинного обучения специалистами по данным во всех отраслях. Также этот алгоритм очень эффективен с точки зрения сокращения времени вычислений и обеспечения оптимального использования ресурсов памяти, еще одной важной особенностью является обработка пропущенных значений при реализации и распараллеливание процесса обучения.

В настоящее время, из-за быстрого увеличения размера набора данных, распределенное обучение действительно важно, поэтому в этом блоге мы собираемся изучить, как кто-то может интегрировать XGBoost + PySpark и сделать обучение модели и подсчет очков.

Можно легко использовать доступный алгоритм ml внутри pyspark.ml или MLLib, но чтобы использовать XGBoost таким же образом, мы должны добавить несколько внешних зависимостей и python XGBoost. оболочки, другой способ — напрямую использовать фреймворк XGBoost native с PySpark, который не поддерживается последней версией XGBoost (единственное ограничение здесь — он поддерживается только в версии python ≥3.8), чтобы знаете, как использовать это с PySpark, ознакомьтесь с этим документом. (спасибо сообществу dmlc..)

но в этом блоге наше основное внимание сосредоточено на том, как интегрироваться с PySpark в версии python ‹ 3.8.

Прежде чем начать интеграцию, загрузите необходимые файлы по этой ссылке.

После того, как вы загрузили все 3 файла, мы готовы интегрировать XGBoost с PySpark, выполнив следующие шаги:

  1. Как указано в приведенном ниже примере кода, добавьте файлы JAR в раздел PYSPARK_SUBMIT_ARGS (не забудьте указать точный путь к местоположению файла)
  2. Создайте сеанс Spark.
  3. Добавьте файл кода оболочки Python XGBoost (файл .zip) в sparkContext. (мы делаем это для поддержки импорта XGBoost, снова убедитесь, что вы указали правильный путь к zip-файлу)
import os
import findspark
import numpy as np

## Add the path of the downloaded jar files
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars xgboost4j-spark.jar,xgboost4j.jar pyspark-shell'

findspark.init()

spark = SparkSession\
        .builder\
        .appName("XGBoost_PySpark")\
        .master("local[*]")\
        .getOrCreate()

spark.sparkContext.addPyFile("sparkxgb.zip")

После того, как вы выполните вышеуказанные шаги, вы можете провести перекрестную проверку, импортировав XGBClassifier или Regressor.

from sparkxgb.xgboost import XGBoostClassificationModel, XGBoostClassifier

если в случае, если вы получите ошибку JAR, то, возможно, это связано с несоответствием версии, в этом случае вы можете проверить, загрузив другую версию файлов JAR здесь.

Теперь, когда все сделано, давайте попробуем построить одну простую модель, следуя приведенным ниже шагам.

Здесь мы взяли набор данных прогнозирования ссуды с открытым исходным кодом и попытались предсказать, будет ли одобрена ссуда или нет, здесь Loan_Status — наша целевая переменная (так как она находится в формате Y/N, который мы преобразовали то же самое в 1/0).

  1. Добавьте необходимые библиотеки Spark
from pyspark.ml.feature import StringIndexer, VectorAssembler
from sparkxgb.xgboost import XGBoostClassificationModel, XGBoostClassifier
from pyspark.ml import Pipeline
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import functions as F

2. Загрузите набор данных и выполните необходимую предварительную обработку

data = spark.read.parquet('train.parquet')
data = data.withColumn('label', F.when(F.col('Loan_Status')=='Y', 1) \
                                            .otherwise(0)
                                  )
data.show(5)
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----+
| Loan_ID|Gender|Married|Dependents|   Education|Self_Employed|ApplicantIncome|CoapplicantIncome|LoanAmount|Loan_Amount_Term|Credit_History|Property_Area|Loan_Status|label|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----+
|LP001002|  Male|     No|         0|    Graduate|           No|           5849|              0.0|      null|           360.0|           1.0|        Urban|          Y|    1|
|LP001003|  Male|    Yes|         1|    Graduate|           No|           4583|           1508.0|     128.0|           360.0|           1.0|        Rural|          N|    0|
|LP001005|  Male|    Yes|         0|    Graduate|          Yes|           3000|              0.0|      66.0|           360.0|           1.0|        Urban|          Y|    1|
|LP001006|  Male|    Yes|         0|Not Graduate|           No|           2583|           2358.0|     120.0|           360.0|           1.0|        Urban|          Y|    1|
|LP001008|  Male|     No|         0|    Graduate|           No|           6000|              0.0|     141.0|           360.0|           1.0|        Urban|          Y|    1|
+--------+------+-------+----------+------------+-------------+---------------+-----------------+----------+----------------+--------------+-------------+-----------+-----+

Как и в приведенном выше образце данных, мы можем видеть несколько столбцов, имеющих категориальные/строковые значения. Поэтому нам нужно преобразовать их в числовые значения, прежде чем передавать их в модель, PySpark предоставляет StringIndexer & OneHotEncoder для того же здесь мы собираемся StringIndexer.

index1 = StringIndexer().setInputCol("Gender").setOutputCol("GenderIndex").setHandleInvalid("keep")
index2 = StringIndexer().setInputCol("Married").setOutputCol("MarriedIndex").setHandleInvalid("keep")
index3 = StringIndexer().setInputCol("Education").setOutputCol("EducationIndex").setHandleInvalid("keep")
index4 = StringIndexer().setInputCol("Self_Employed").setOutputCol("SelfEmployedIndex").setHandleInvalid("keep")
index5 = StringIndexer().setInputCol("Property_Area").setOutputCol("PropertyAreaIndex").setHandleInvalid("keep")

Теперь, как мы видим, у нас есть несколько объектов StringIndexer, и нам нужно выполнить преобразование через каждый индексатор, что является длительным процессом, поэтому необходимо объединить все преобразования, векторизацию признаков и подгонку модели PySpark ML предоставляет концепцию ML Pipeline, в которой мы можем объединять несколько шагов вместе и запускать их на ходу,

Перед созданием объекта Pipeline давайте определим объекты VectorAssembler и Model.

## define list of your final features
features = ['GenderIndex', 'MarriedIndex', 'EducationIndex', 'SelfEmployedIndex', 'PropertyAreaIndex',
           'ApplicantIncome', 'CoapplicantIncome', 'LoanAmount', 'Loan_Amount_Term', 'Credit_History']

vec_assembler = VectorAssembler(inputCols=features, outputCol='features', handleInvalid='keep')

xgb = XGBoostClassifier(objective="binary:logistic",seed=1712,
                        featuresCol="features",
                        labelCol="label",
                        missing=0.0,
                        )

Теперь давайте создадим объект конвейера и построим окончательную модель.

3. Создать конвейер машинного обучения

# here add all your steps inside setStages

pipeline = Pipeline().setStages([index1, index2, index3, index4, index5, vec_assembler, xgb])

# split the data in train and test
trainDF, testDF = train_data.randomSplit([0.7, 0.3], seed=1712)

model = pipeline.fit(trainDF)

# Generate the prediction on test data

predictions = model.transform(testDF)[['Loan_ID', 'prediction', 'label']]
predictions.show()
+--------+----------+-----+
| Loan_ID|prediction|label|
+--------+----------+-----+
|LP001005|       1.0|    1|
|LP001013|       1.0|    1|
|LP001018|       1.0|    1|
|LP001027|       1.0|    1|
|LP001032|       1.0|    1|
+--------+----------+-----+

вау.. наконец-то мы построили модель XGBoost и протестировали ее с помощью PySpark

4. Проверьте показатели эффективности

from pyspark.sql.types import DoubleType

predictionAndLabels = predictions.select(['prediction', 'label']\
                                  ).withColumn('label',F.col('label').cast(DoubleType())).rdd

metrics = MulticlassMetrics(predictionAndLabels)

cm = metrics.confusionMatrix().toArray()

accuracy=(cm[0][0]+cm[1][1])/cm.sum()
precision=(cm[1][1])/(cm[0][1]+cm[1][1])
recall=(cm[1][1])/(cm[1][0]+cm[1][1])

print(accuracy, precision, recall)
(0.7015706806282722, 0.7517241379310344, 0.8384615384615385)

5. Настройка гиперпараметров (необязательный шаг)

Если вы хотите настроить параметры XGBoost, следуйте приведенному ниже коду, на самом деле это не осуществимый способ, так как для большого объема данных это очень дорого, так как он строит модель для каждой комбинации параметров, лучше Подход заключается в использовании RandomSearchCV в python и использовании этого параметра здесь, или вы можете разработать код в PySpark, используя случайное значение, выбираемое и назначаемое модели каждый раз.

from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator

xgbEval = BinaryClassificationEvaluator()

# Define your list of grid search parameters

paramGrid = (ParamGridBuilder()
             .addGrid(xgb.alpha,[1e-5, 1e-2, 0.1])
             .addGrid(xgb.eta, [0.001, 0.01])
             .addGrid(xgb.numRound, [150,160])
             .addGrid(xgb.maxDepth, range(3,7,3))
             .addGrid(xgb.minChildWeight, [3.0, 4.0])
             .addGrid(xgb.gamma, [i/10.0 for i in range(0,2)])
             .addGrid(xgb.colsampleBytree, [i/10.0 for i in range(3,6)])
             .addGrid(xgb.subsample, [0.4,0.6])
             .build())

cv = CrossValidator(estimator=pipeline, estimatorParamMaps=paramGrid, evaluator=xgbEval, numFolds=3)
cvModel = cv.fit(trainDF)
cvPreds = cvModel.transform(testDF)
xgbEval.evaluate(cvPreds)

## Print the tuned params
cvModel.bestModel.extractParamMap()

Спасибо, что нашли время, чтобы прочитать мой блог, я надеюсь, что это было полезно для вас. Подводя итог, мы немного узнали о XGBoost и его преимуществах. Мы также рассмотрели пошаговую реализацию XGBoost в PySpark с использованием внешних зависимостей. Я с нетерпением жду всевозможных предложений от моих читателей, дайте мне знать, о чем вы хотите, чтобы я написал дальше.