Пошаговое руководство по работе с файлами BigData в Python с помощью Pandas / Dask

Это одно из моих постов из категории Tackle, которое можно найти в моем репозитории на github здесь.

(Edit-31.01.2019) - Добавлена ​​информация о dask.distributed.LocalCluster для BigData

(Edit-12/4/2019) - Добавлены новые разделы об уменьшении размера набора данных и использовании типов файлов [еще не завершено, но вы можете взять идеи и применить их.]

Показатель

  1. "Вступление"
  2. Уменьшение памяти столбца (pd.Series.astype ()) [: Не завершено]
  3. Типы файлов (для меньшего использования памяти) [: Не завершено]
  4. Исследование данных
  5. Предварительная обработка
  6. Пошаговое обучение (с пандами)
  7. Dask (Изучение + Подготовка + FitPredict)
  8. "Дальнейшее чтение"
  9. "Использованная литература"
NOTE:
This post goes along with Jupyter Notebook available in my Repo on Github:[HowToHandleBigData]  (with dummy data)
and 
Kaggle:[HowToHandleBigData] (with Kaggle competition data)

1. Введение ^

Поскольку размер данных растет в геометрической прогрессии, мы не можем вместить наши данные и даже наши модели в память нашего ПК. И все мы не можем позволить себе покупать высококлассные настольные монстры в сборе.

Например, недавний конкурс kaggle имеет набор данных, который не может поместиться в 17 ГБ ОЗУ в ядрах kaggle или в Colab. Он имеет почти 2 миллиона строк, и, кроме того, некоторые столбцы содержат очень большие данные JSON в виде строк. Как нам с этим справиться? К кому мы можем обратиться?

На помощь приходит инкрементальное обучение и / или Dask!

Возможно, вы уже знаете, что нейронные сети по своей природе являются инкрементальными учениками, поэтому мы можем решить эту проблему там. И многие из моделей sklearn предоставляют метод под названием partial_fit, с помощью которого мы можем группировать модели. А некоторые библиотеки Boosting, такие как XGBoost и LightGBM, предоставляют способ постепенного обучения работе с большими данными.

Здесь мы рассмотрим дополнительные решения, предоставляемые некоторыми алгоритмами повышения. А затем мы будем использовать Dask в том же наборе данных и использовать его модели для прогнозирования.

2. Уменьшение памяти столбца

^

# I don't know who the original author of this function is,
# but you can use this function to reduce memory
# consumption by 60-70%!
def reduce_mem_usage(df):
    """ 
    iterate through all the columns of a dataframe and 
    modify the data type to reduce memory usage.        
    """
    start_mem = df.memory_usage().sum() / 1024**2
    print(('Memory usage of dataframe is {:.2f}' 
                     'MB').format(start_mem))
    
    for col in df.columns:
        col_type = df[col].dtype
        
        if col_type != object:
            c_min = df[col].min()
            c_max = df[col].max()
            if str(col_type)[:3] == 'int':
                if c_min > np.iinfo(np.int8).min and c_max <\
                  np.iinfo(np.int8).max:
                    df[col] = df[col].astype(np.int8)
                elif c_min > np.iinfo(np.int16).min and c_max <\
                   np.iinfo(np.int16).max:
                    df[col] = df[col].astype(np.int16)
                elif c_min > np.iinfo(np.int32).min and c_max <\
                   np.iinfo(np.int32).max:
                    df[col] = df[col].astype(np.int32)
                elif c_min > np.iinfo(np.int64).min and c_max <\
                   np.iinfo(np.int64).max:
                    df[col] = df[col].astype(np.int64)  
            else:
                if c_min > np.finfo(np.float16).min and c_max <\
                   np.finfo(np.float16).max:
                    df[col] = df[col].astype(np.float16)
                elif c_min > np.finfo(np.float32).min and c_max <\
                   np.finfo(np.float32).max:
                    df[col] = df[col].astype(np.float32)
                else:
                    df[col] = df[col].astype(np.float64)
        else:
            df[col] = df[col].astype('category')
    end_mem = df.memory_usage().sum() / 1024**2
    print(('Memory usage after optimization is: {:.2f}' 
                              'MB').format(end_mem))
    print('Decreased by {:.1f}%'.format(100 * (start_mem - end_mem) 
                                             / start_mem))
    
    return df

TK

3. Типы файлов

^

HDF5, паркет и др.

Примечание. Если вы не хотите попадать в этот раздел, вы можете просто изучить HDF5. Этого должно быть достаточно. И перейти к следующему разделу.

Иерархический формат данных - Википедия (HDF5)

Паркет Апач

Форматы файлов Hadoop: это уже не просто CSV - Кевин Хаас (Не пугайтесь после прочтения Hadoop, вы можете обойтись без использования только формата Parquet и методов pd.DataFrame.to_parquet и pd.DataFrame.read_parquet.)

Также, если вам интересно: Паркет vs Авро [все равно рекомендую посмотреть.]

TK

4. Исследование данных ^

Во-первых, мы почувствуем, как выглядят наши данные, просмотрев первые несколько строк с помощью команды:

part = pd.read_csv("train.csv.zip", nrows=10)
part.head()

Благодаря этому у вас будет базовая информация о том, как структурированы разные столбцы, как обрабатывать каждый столбец и т. Д. Составьте списки столбцов различного типа, такие как numerical_columns, obj_columns, dictionary_columns и т.д., которые будут содержать все соответствующие столбцы.

Теперь, чтобы исследовать данные, мы будем идти столбец за столбцом следующим образом:

# For dictionary columns you can do:
# 'idx' is index of corresponding column in DataFrame.
# You can find it by using np.where(col==df.columns)
for col in dictionary_columns:
    df = pd.read_csv("train.csv.zip", usecols = [idx], converters={col: json.loads})
    column_as_df = json_normalize(df[col])
    # ... plot each column ...
    # ... check if you want to drop any column ...

Вы можете сохранить словарь со всеми именами столбцов в качестве ключей и методами, применяемыми к нему, в списке в качестве конвейера для этого столбца. Вы также можете создать дамп своего словаря для будущего использования:

with open("preprocessing_pipeline.pickle", "wb") as fle:
  pickle.dump(preprocessing_pipeline, fle)

Если хотя бы один из столбцов слишком велик для вашей памяти, что на самом деле имело место для одной из строк в соревновании kaggle, о котором я упоминал выше. Вы даже можете открывать один столбец постепенно и выполнять базовые операции, такие как вычисление среднего значения, стандартного отклонения и т. Д. Вручную. Или вы также можете использовать для этого Dask и использовать почти тот же API, что и pandas, и вычислять их с его помощью. См. Последний раздел для Dask.

5. Предварительная обработка ^

Для предварительной обработки данных мы будем использовать словарь, который мы создали ранее, в котором есть информация о том, какие столбцы мы хотим сохранить (как ключи) и какие методы применить к каждому столбцу (как значения), чтобы создать метод.

Этот метод будет вызываться для каждого пакета данных в процессе инкрементального обучения.

Теперь следует отметить, что мы установили методы (например, LabelEncoder, Scalars и т. Д.) Во время исследования всего столбца данных, и мы будем использовать их для преобразования данные на каждом шаге приращения здесь. Поскольку в каждом пакете могут отсутствовать некоторые данные, и если бы мы использовали разные LabelEncoder, Scalar и т. Д. Для каждого пакета, эти методы не дали тот же результат для той же категории (скажем). Вот почему мы уже установили целые колонны во время разведки.

Вот как можно предварительно обработать данные:

def preprocess(df):
  df.reset_index(drop=True, inplace=True)
  
  # For dict columns:
  for col in dict_columns:
    col_df = json_normalize(df[col])
    # json.loads during pd.read_csv to convert string to dict.                                
    col_df.columns = [f"{col}.{subcolumn}" for subcolumn in col_df.columns]
    # Select all columns which we selected before.
    selected_columns = [c for c in dictionary.keys() if c in col_df.columns()]
    to_drop = [c for c in col_df.columns if not in selected_columns]
    
    # Drop all previously unselected columns.
    col_df = col_df.drop(to_drop, axis=1)                                       
    
    df = df.drop(col, axis=1).merge(col_df, right_index=True, left_index=True)
    
  # And so on...
  
  # And then apply all Scalars, LabelEncoder's to all columns selected...
  
  return df

6. Пошаговое обучение ^

Чтобы читать файл данных постепенно с использованием pandas, вы должны использовать параметр chunksize, который указывает количество строк для чтения / записи за раз.

incremental_dataframe = pd.read_csv("train.csv",
                        chunksize=100000) # Number of lines to read.
# This method will return a sequential file reader (TextFileReader)
# reading 'chunksize' lines every time. To read file from 
# starting again, you will have to call this method again.

Затем вы можете постепенно обучать свои данные, используя XGBoost ¹ или LightGBM. Для LightGBM вы должны передать аргумент keep_training_booster=True его .train методу и три аргумента .train методу XGBoost.

# First one necessary for incremental learning:
lgb_params = {
  'keep_training_booster': True,
  'objective': 'regression',
  'verbosity': 100,
}
# First three are for incremental learning:
xgb_params = {
  'update':'refresh',
  'process_type': 'update',
  'refresh_leaf': True,
  'silent': False,
  }

На каждом шаге мы сохраняем нашу оценку, а затем передаем ее в качестве аргумента на следующем шаге.

# For saving regressor for next use.
lgb_estimator = None
xgb_estimator = None
for df in incremental_dataframe:
  df = preprocess(df)
  
  xtrain, ytrain, xvalid, yvalid = # Split data as you like
  
  lgb_estimator = lgb.train(lgb_params,
                         # Pass partially trained model:
                         init_model=lgb_estimator,
                         train_set=lgb.Dataset(xtrain, ytrain),
                         valid_sets=lgb.Dataset(xvalid, yvalid),
                         num_boost_round=10)
  
  xgb_model = xgb.train(xgb_params, 
                        dtrain=xgb.DMatrix(xtrain, ytrain),
                        evals=(xgb.DMatrix(xvalid, yvalid),"Valid"),
                        # Pass partially trained model:
                        xgb_model = xgb_estimator)
  
  del df, xtrain, ytrain, xvalid, yvalid
  gc.collect()

Метод инкрементального обучения CatBoost находится в стадии разработки. ²

Чтобы немного ускорить процесс и если ваши фрагменты все еще достаточно велики, вы можете распараллелить свой метод предварительной обработки, используя функции библиотеки Python multiprocessing, например:

n_jobs = 4
for df in incremental_dataframe:
  p = Pool(n_jobs)
  f_ = p.map(preprocess, np.array_split(df, n_jobs))
  f_ = pd.concat(f_, axis=0, ignore_index=True)
  p.close()
  p.join()
  
  # And then your model training ...

Чтобы узнать о параллельном программировании в Python, прочтите мой пост здесь.

7. Даск ^

Dask помогает подключаться к ресурсам больших данных последовательно-параллельным образом.

Для ознакомления с Dask прочтите мой пост здесь.

Аналогичным образом вы можете применить здесь функции из pandas API. Вы можете проверить, есть ли такое нулевое значение:

df.isnull().sum().compute()

Чтобы масштабировать столбец, вы можете преобразовать их в массив и аналогичным образом использовать его функции масштабирования:

rsc = dask_ml.preprocessing.RobustScaler()
result = rsc.fit_transform(X[:,i].reshape(-1, 1)) # for ith column

Для обработки JSON или любых других частично структурированных данных, таких как файлы журналов и т. Д., Вы можете использовать функции, предоставляемые контейнером Dask Bag.

df[key] = df[dict_col].to_bag().pluck(key).to_dataframe().iloc[:,0]

А после предварительной обработки вы можете использовать одну из моделей Dask для обучения ваших данных.

Полный код читайте в разделе readDask в Jupyter Notebook здесь.

Note:
You should only use Dask in case of Big Data, where it is not able to fit in your memory. Otherwise in-memory learning with pandas and sklearn will be lot faster.
Note: (Local Cluster)
You can perform almost any BigData related query/tasks with the help of LocalCluster. You can, specifically, use 'memory_limit' parameter to constrict Dask's memory usage to a specific amount. 
Also, at times you might notice that Dask is exceeding memory use, even though it is dividing tasks. It could be happening to you because of the function you are trying to use on your dataset wants most of your data for processing, and multiprocessing can make things worse as all workers might try to copy dataset to memory. This can happen in aggregating cases.
In these cases you can use Dask.distributed.LocalCluster parameters and pass them to Client() to make a LocalCluster using cores of your Local machines.
from dask.distributed import Client, LocalCluster
client = Client(n_workers=1, threads_per_worker=1, processes=False,
                memory_limit='25GB', scheduler_port=0, 
                silence_logs=False, diagnostics_port=0)
client
'scheduler_port=0' and 'diagnostics_port=0' will choose random port number for this particular client. With 'processes=False' dask's client won't copy dataset, which would have happened for every process you might have made.
You can tune your client as per your needs or limitations, and for more info you can look into parameters of LocalCluster.
You can also use multiple clients on same machine at different ports.

8. Дополнительная информация ^

  1. Https://www.kaggle.com/mlisovyi/bigdata-dask-pandas-flat-json-trim-data-upd
  2. https://github.com/dmlc/xgboost/issues/3055#issuecomment-359505122
  3. Https://www.kaggle.com/ogrellier/create-extracted-json-fields-dataset
  4. Https://github.com/Microsoft/LightGBM/blob/master/examples/python-guide/advanced_example.py
  5. Уменьшение объема памяти DataFrame на ~ 65% | Kaggle
  6. 7 способов обработки больших файлов данных для машинного обучения - мастерство машинного обучения

9. Ссылки ^

  1. Https://www.kaggle.com/mlisovyi/bigdata-dask-pandas-flat-json-trim-data-upd
  2. Https://www.kaggle.com/ogrellier/create-extracted-json-fields-dataset
  3. Https://gist.github.com/goraj/6df8f22a49534e042804a299d81bf2d6
  4. Https://github.com/dmlc/xgboost/issues/3055
  5. Https://github.com/catboost/catboost/issues/464
  6. Https://github.com/Microsoft/LightGBM/issues/987
  7. Https://gist.github.com/ylogx/53fef94cc61d6a3e9b3eb900482f41e0
Suggestions and reviews are welcome.
Thank you for reading!

Подпись: