Уменьшите потребление памяти Dask XGBoost

Я пишу простой код сценария для обучения предсказателя XGBoost в моем наборе данных. Это код, который я использую:

import dask.dataframe as dd
import dask_ml
from dask.distributed import Client, LocalCluster
import sys
from dask_ml.model_selection import train_test_split
import dask
import xgboost
import dask_xgboost

def start_cluster(n_workers=1, threads_per_worker=2, memory_limit="12GB", processes=False):
    cluster = LocalCluster(
        n_workers=n_workers, threads_per_worker=threads_per_worker, memory_limit=memory_limit, processes=processes
    )
    client = Client(cluster)  # use default n_threads and mem
    print(client)
    print(client.cluster)
    print("Client infos:", client.scheduler_info())
    return client

client = start_cluster()

dask_df = dd.read_parquet('./sample_dataset', engine='pyarrow')

dask_df=dask_df.drop(
    ['mapped_tweet_id',
     'mapped_creator_id',
     'mapped_engager_id',
     'engagement_retweet_timestamp',
     'engagement_comment_timestamp',
     'engagement_reply_timestamp',
     'mapped_tweet_links',
     'mapped_domains',
     'mapped_tweet_hashtags'
    ], axis=1
)

y = dask_df['engagement_like_timestamp']>0

dask_df=dask_df.drop(
    [
     'engagement_like_timestamp',
    ], axis=1
)

X_train, X_test, y_train, y_test = train_test_split(dask_df, y, test_size=0.2, shuffle= True)

params = {'objective': 'binary:logistic',
          'max_depth': 4, 'eta': 0.01, 'subsample': 0.5,
          'min_child_weight': 0.5}

bst = dask_xgboost.train(client, params, X_train, y_train, num_boost_round=10)

Он работает нормально, но я продолжаю получать обычное предупреждение сборщика мусора, связанное с Dask (distributed.utils_perf - WARNING - full garbage collections took 36% CPU time recently (threshold: 10%))

Я проанализировал доступную панель управления и заметил, что мой код продолжает увеличивать использование памяти, пока не достигнет предела 80% (я изменил стандартные настройки внутри папки .config), а затем он начинает замедляться из-за сборщика мусора.

Вот пример:

Панель управления

По сути, это продолжается до тех пор, пока не заполнится вся доступная память. Набор данных, который я использую, довольно большой, поэтому я использую Dask. Однако кажется, что он в основном загружает весь набор данных в память (я не знаю, верно ли это предположение, но кажется, что это так из-за задач чтения-паркета).

Код довольно прост и, похоже, не вызывает больших проблем.

Автор библиотеки в следующем выпуске (Dask Github Issue) говорит:

Также стоит отметить, что это сообщение об ошибке

Распределенный.utils_perf - ПРЕДУПРЕЖДЕНИЕ - в последнее время полная сборка мусора занимала 47% процессорного времени (порог: 10%) чаще всего (но не исключительно) является ошибкой кода, который вы запускаете, и не имеет ничего общего с Dask. Даск просто находится в хорошем положении, чтобы сообщить вам, если такие вещи происходят.

Однако, как я уже сказал, код действительно прост.

  1. Как я могу убрать это предупреждение? Это действительно снижает производительность моего кода
  2. Я использую Dask для работы по частям, поскольку мой набор данных слишком велик для размещения в памяти. Однако кажется, что он загружает все в память, что делает использование Dask бесполезным. Как я могу заставить его работать должным образом (по частям)?

person Mattia Surricchio    schedule 01.05.2021    source источник


Ответы (1)


Не могли бы вы попробовать использовать xgboost.dask. XGboost теперь имеет встроенную поддержку Dask (прочтите наши блог)

Вот несколько документов: https://xgboost.readthedocs.io/en/latest/tutorials/dask.html

Вот пример кода с сайта:

import xgboost as xgb
import dask.array as da
import dask.distributed

cluster = dask.distributed.LocalCluster(n_workers=4, threads_per_worker=1)
client = dask.distributed.Client(cluster)

# X and y must be Dask dataframes or arrays
num_obs = 1e5
num_features = 20
X = da.random.random(
    size=(num_obs, num_features),
    chunks=(1000, num_features)
)
y = da.random.random(
    size=(num_obs, 1),
    chunks=(1000, 1)
)

dtrain = xgb.dask.DaskDMatrix(client, X, y)

output = xgb.dask.train(client,
                        {'verbosity': 2,
                         'tree_method': 'hist',
                         'objective': 'reg:squarederror'
                         },
                        dtrain,
                        num_boost_round=4, evals=[(dtrain, 'train')])

Не могли бы вы сообщить нам, где вы упомянули предыдущий метод использования dask и xgboost? Если это есть в нашей документации, я бы хотел исправить это!

person TaureanDyerNV    schedule 03.05.2021
comment
Я попробую код как можно скорее! Однако я прочитал следующие документы: ml.dask.org/xgboost.html, examples.dask.org/machine-learning/xgboost.html и ml.dask.org/modules/generated/dask_ml.xgboost.train.html . Меня смутило количество разных интерфейсов xgboost, включений и вызовов. Как вы можете заметить, в каждой ссылке документации интерфейс немного отличается (а также необходимый оператор импорта). Документацию вы связали с новой официальной? - person Mattia Surricchio; 03.05.2021
comment
И я также хотел знать, может ли связанный вами xgboost обучаться на наборах данных, которые не помещаются в память (ни локальные, ни распределенные). Чтобы прояснить этот момент, я использую локальный компьютер объемом 16 ГБ (локальный кластер) с набором данных, превышающим мою память. Способна ли эта реализация обучать набор данных по частям без загрузки всего набора данных в память? - person Mattia Surricchio; 03.05.2021