Ускорьте конвейер обработки больших данных за 10 минут

Авторы: Нан Сюй, Вэйцзя Ли, Хаоран Чен

Этот блог создается и поддерживается студентами программы профессионального магистра в Школе компьютерных наук Университета Саймона Фрейзера в рамках их кредита на курс. Чтобы узнать больше об этой уникальной программе, посетите {sfu.ca/computing/pmp}.

Dask - это универсальное решение для общей обработки больших данных. Независимо от того, являетесь ли вы разработчиком Python, желающим ускорить существующие кодовые базы, специалистом по обработке данных, стремящимся извлечь понимание из сложных данных, или микробиологом, который хочет анализировать терабайты изображений, Dask поможет вам.

Созданный с нуля на Python, Dask действительно единственный в своем роде. Поскольку он разработан совместно с командами Pandas, scikit-learn и Jupyter, он предлагает много вещей, которых нет у его конкурента PySpark. Благодаря Dask разработчикам Python больше не нужно читать сложные сообщения об ошибках Java, постоянно переключаться между разными синтаксисами или переписывать всю кодовую базу, чтобы извлечь выгоду из распределенных вычислений.

Dask также упрощает рабочий процесс с большими данными. Его превосходная производительность на одной машине ускоряет этап прототипирования и приводит к более быстрому развертыванию модели. Для любого, кто имеет опыт работы с Pandas, NumPy или SciPy, распараллеливание существующего рабочего процесса с помощью Dask безболезненно и требует лишь небольших изменений. Dask предоставляет самый простой способ развернуть не только статистический анализ данных, но и конвейер машинного обучения и обработки изображений на кластерах.

Dask: параллельное программирование на Python

Что такое Даск?

Гибкая библиотека для параллельных вычислений на Python.

Dask - это библиотека Python, использующая планирование задач для решения вычислительных задач. Dask предоставляет наиболее широко используемые структуры данных, унаследованные от Pandas и Numpy, а также базовые интерфейсы параллельных вычислений, основанные на его собственной системе планирования задач, для осуществления крупномасштабных вычислений данных.

Почему стоит выбрать Dask?

Код на Python, параллельные вычисления.

Как специалист по анализу данных или инженер по машинному обучению, вы можете столкнуться с несколькими проблемами во время своего проекта:

  1. Набор данных очень велик, поэтому вашему компьютеру не хватает памяти.
  2. Вы ожидаете переключения между кластером и домашней рабочей станцией.
  3. Многопроцессорность или многопоточность вычислений - это то, о чем вы всегда мечтаете.
  4. Доступны и другие фреймворки, но их API отличаются от того, что вы обычно используете.
  5. Понимание порядка вычислений необходимо.
  6. Вам нужен Python для всего в вашем проекте.

Dask решает ВСЕ ЭТИ за вас!

При использовании Dask вы можете использовать все ваши знакомые библиотеки и инструменты Python и выполнять вычисления параллельно. Dask настраивает некоторые из наиболее часто используемых структур данных в соответствии с требованиями к большим наборам данных и поддерживает вычисление и обработку всего в запланированном порядке. Независимо от того, используете ли вы ноутбук или владеете кластером из 1000 процессоров, Dask работает по одной и той же стратегии, предлагая высокую точность и надежность.

Ленивая оценка, метод минимизации работы, выполняемой для вычислений, используется для планирования и оптимизации всех задач в программе до получения окончательного результата.

Весь граф задач можно легко визуализировать с помощью одной строчки кода, и это поможет вам понять, где оптимизировать процесс от начала до конца.

Как использовать Dask?

Делайте все, что вы делаете с Python.

Dask можно установить с помощью conda или pip.

# Install with conda
conda install -c conda-forge dask
# Install with pip
pip install dask
  • Две самые важные функции в Dask

Чтобы понять и запустить код Dask, вам нужно знать две первые функции: .visualize() и .compute().

.visualize() обеспечивает визуализацию графа задач, графа функций Python и взаимосвязей между собой. На основе этих зависимостей планировщик задач в Dask определяет, как запускать функции параллельно. Параметр rankdir="LR" полезен, если предполагается, что график будет просматриваться слева направо.

Dask использует стратегию отложенного вычисления, поэтому программа вычисляет результаты только после вызова .compute(). Чтобы не вызывать .compute() несколько раз для получения результатов в коллекциях, существует также .compute() функция, принимающая несколько коллекций и возвращающая несколько результатов.

y = (x + 1).sum()
z = (x + 1).mean()
# compute results of y and z at once
da.compute(y, z)
  • Коллекции высокого уровня
  1. Множество

dask.array разбивает большой массив на маленькие блоки по ndarray.

Быстрый пример визуализации этого - создание 2D-массива из 100 000 * 100 000 чисел с 10 000 фрагментов размером 1000 * 1000.

import dask.array as da
x = da.random.random((100000, 100000), chunks=(1000, 1000))

Исходный массив около 80 ГБ! nparray может быть чрезвычайно сложно справиться с этим на большинстве персональных компьютеров. Но, как мы видим, на каждый фрагмент приходится всего 8 МБ, что намного меньше и проще в обработке.

Здесь поддерживается большинство функций, которые вы хотите вызвать nparray, например:

y = x + x.T
z = y[::2, 5000:].mean(axis=1)

2. Фрейм данных

dask.dataframe реализован на основе pandas.dataframe, объединяя несколько фреймов данных Pandas по индексу в огромный фрейм данных.

Функции dask.dataframe - это подмножество, скопированное из pandas:

import dask.dataframe as dd
df = dask.datasets.timeseries()
df2 = df[df.y > 0]
df3 = df2.groupby('name').x.std()
df3.head(20)

3. Сумка

dask.bag реализован на основе python.list, который предназначен для простых параллельных вычислений для неструктурированных или полуструктурированных наборов данных, таких как текстовые файлы и объекты JSON.

Вы можете выполнять вызовы функций, как с pyspark.rdd или pytoolz:

import dask.bag as db
b = db.read_text('data/*.json').map(json.loads)
b.map(lambda record: record['occupation']).take(2)
b.filter(lambda record: record['age'] > 30).take(2)
b.count().compute()
  • Низкоуровневый интерфейс

Иногда вы можете захотеть распараллелить свой алгоритм на некоторых небольших задачах, но Array, DataFrame или Bag недостаточно для использования, или вы хотите создать некоторые функции самостоятельно. Тогда пришло время использовать dask.delayed(). Гораздо проще использовать .delayed() для параллельного программирования, которое вызывает только dask.delayed(func)(parameters).

dask.delayed() очень хорошо работает с циклами, например:

def inc(x):
    return x + 1
def mul(x, y):
    return x * y
def add(x, y):
    return x + y
results = []
for x in [1, 2, 3, 4]:
    a = dask.delayed(inc, pure = True)(1)
    b = dask.delayed(mul, pure = True)(2, x)
    c = dask.delayed(add, pure = True)(a, b)
    results.append(c)
        
total = dask.delayed(sum, pure = True)(results)
total.visualize(rankdir="LR")

Из приведенного выше графика задач вы можете заметить, что inc() вызывается только один раз. Причина в том, что a = inc(1) одно и то же на всех итерациях, поэтому планировщик задач оптимизирует его, и он совместно используется для многократного использования.

Dask-ML: масштабируемое машинное обучение

Что такое Dask-ML?

Фантастическая библиотека для параллельного и распределенного машинного обучения на Python.

Dask-ML нацелен на обеспечение машинного обучения для больших наборов данных и более крупных проблем - масштабируемое машинное обучение. Он построен на основе Dask и может рассматриваться как параллельная реализация scikit-learn.

Почему стоит выбрать Dask-ML?

Он обеспечивает масштабируемое машинное обучение.

Вы можете задаться вопросом, зачем нам использовать Dask-ML, если scikit-learn уже доступен. Что ж, это хороший улов!

Любой проект машинного обучения может столкнуться с одной из следующих проблем:

  1. Большие наборы данных: данных намного больше, чем может обработать ваш маленький компьютер.
  2. Большие модели: данные хорошо подходят, но обучение моделей и настройка гиперпараметров занимает несколько дней, больше, чем вы можете ждать.

Тогда как вы справляетесь с такими проблемами масштабирования? Ну, это зависит от того, с какой проблемой вы столкнулись.

а. Для проблем с памятью: просто используйте scikit-learn или любую другую библиотеку машинного обучения, которая вам нравится.

Примечание: scikit-learn - отличная библиотека для машинного обучения в памяти на моделях, которые не слишком велики и данные могут уместиться в ОЗУ. Но он плохо масштабируется.

б. Для больших моделей: используйте распределенную scikit-learn, способ взять вашу любимую модель scikit-learn и использовать кластер для обучения вместо одной машины. Отметьте dask_ml.joblib.

Примечание: scikit-learn уже позволяет выполнять параллельные вычисления на одной машине с Joblib. Даск масштабирует этот параллелизм до кластера машин.

c. Для больших наборов данных: используйте встроенные оценщики Dask-ML.

Примечание. Dask-ML повторно реализует некоторые алгоритмы машинного обучения, которые можно записать как алгоритмы NumPy, и заменяет массивы NumPy массивами Dask для достижения масштабируемых алгоритмов. Они хорошо обрабатывают наборы данных размером больше памяти. Взгляните на линейные модели и кластеризацию.

О, вы можете задаться вопросом, как Dask-ML работает с XGBoost и TensorFlow. Что ж, Dask-ML не реализует их заново, поскольку у них уже есть свои собственные распределенные решения, которые работают довольно хорошо. Если вас интересуют подробности, отметьте Dask-ML + XGBoost и Dask-ML + TensorFlow.

Как использовать Dask-ML?

Аналогично использованию scikit-learn.

Dask-ML может быть установлен как с conda, так и с pip.

# Install with conda
conda install -c conda-forge dask-ml
# Install with pip
pip install dask-ml

Фактически, Dask-ML предоставляет широкий спектр функций, позволяющих решать проблемы, в которых обычно используется scikit-learn. Он охватывает темы от предварительной обработки данных и перекрестной проверки до поиска гиперпараметров и моделей / оценок.

Если вы знакомы с scikit-learn, вы будете чувствовать себя как дома с Dask-ML! Следовательно, мы не собираемся вести вас здесь шаг за шагом.

В приведенном ниже примере показано, как использовать встроенный в Dask-ML оценщик логистической регрессии для прогнозирования. Попробуйте использовать свой собственный набор данных и получайте удовольствие!

from dask_ml.datasets import make_regression
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LinearRegression
X, y = make_regression(n_samples=1250, n_features=4, chunks=50)
# split into train and test data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
# fit the model
lr = LinearRegression()
lr.fit(X_train, y_train)
# make the prediction on test data
prediction = lr.predict(X_test)

И вот краткий список распространенных API. Узнайте больше о dask_ml.

Dask-Image: распределенная обработка изображений

Что такое Dask-Image?

Единственная библиотека Python для распределенной обработки изображений.

Dask-Image разработан с одной конкретной целью: упростить рабочий процесс для обработки крупномасштабных данных изображений. Построенный на основе Dask, Dask-Image хорошо интегрирует библиотеку обработки изображений SciPy вместе с возможностями масштабируемых параллельных вычислений Dask и создает простую в использовании распределенную библиотеку обработки изображений для всех. Это действительно первый в своем роде.

Почему стоит выбрать Dask-Image?

Изображения стали больше, поэтому используйте свой кластер.

Для многих областей, связанных с наукой, таких как геопространственные изображения, астрономия и нейробиология, размер изображений уже находится в диапазоне терабайт. Эта ситуация создает множество новых проблем для традиционного рабочего процесса обработки изображений:

  1. Не помещается в памяти: нам нужно написать дополнительный код для обработки фрагмента изображения фрагментом.
  2. Не параллельно: не все знают, как писать низкоуровневый код для параллельного выполнения задач.
  3. Невозможно проверить: поскольку фрагменты обрабатываются последовательно, мы не можем проверять промежуточные результаты всего изображения.
  4. Накладные расходы на вычисления: такие функции, как фильтр Гаусса, требуют знания соседних пикселей, поэтому фрагменты должны перекрываться друг с другом.

Если вы хотите сохранить существующий рабочий процесс без потери производительности, Dask-Image может вам помочь. Благодаря возможностям распределенных вычислений Dask теперь исследователи могут быстрее получать результаты, не беспокоясь о написании кода ядра. Dask-Image предоставляет знакомый SciPy-подобный API, позволяющий запускать конвейер обработки изображений в кластере. Также стоит отметить, что Dask-Image может помочь с тривиальными параллельными проблемами, такими как пакетная обработка небольших изображений.

Как использовать Dask-Image?

Практически то же самое, что и SciPy.

Dask-Image можно установить с помощью conda или pip.

# Install with conda
conda install -c conda-forge dask-image
# Install with pip
pip install dask-image

Предположим, мы хотим проанализировать большое изображение с помощью Dask-Image. Как вы понимаете, первым шагом является загрузка изображения, но можете ли вы догадаться, как называется функция загрузки изображения в Dask-Image?

Это .imread(), такое же имя используется во многих других библиотеках Python.

У вас возникает ощущение, что Даск действительно хочет упростить вещи, не так ли?

Здесь мы предполагаем, что изображение разбито на части, что является обычной практикой для хранения изображений, которые не помещаются в памяти. Эти фрагменты хранятся в папке с именем «example-tile» и имеют общий префикс «image».

import dask_image.imread
import dask_image.ndfilters
import dask_image.ndmeasure
import dask.array as da
import matplotlib.pyplot as plt
import os
%matplotlib inline
filename_pattern = os.path.join('example-tiles', 'image*.png')
images_chunks = dask_image.imread.imread(filename_pattern)
fig, ax = plt.subplots(nrows=3, ncols=3, figsize=(17,10),sharex=True,sharey=True,gridspec_kw={'wspace': 0.05,'hspace': 0.1})
for i in range(3):
    for j in range(3):
        ax[i,j].imshow(images_chunks[i*3+j])
        ax[i,j].axis('off')
plt.show()

Мы знаем, что Dask-Image работает должным образом. А теперь перейдем к настоящему делу.

Большинство конвейеров обработки изображений включают преобразование цветового пространства из RGB в оттенки серого. Мы сделаем то же самое и применим к этим фрагментам специальную функцию.

def grayscale(rgb):
    result = ((rgb[..., 0] * 0.3) + 
              (rgb[..., 1] * 0.59) + 
              (rgb[..., 2] * 0.11))
    return result
result = grayscale(images_chunks)
result.visualize()

Посмотрите на график задач выше. У нас есть девять идеально разделенных групп, а это означает, что преобразование цветового пространства - просто тривиальная параллельная задача.

Используйте следующий код для визуализации полученных изображений.

fig, ax = plt.subplots(nrows=3, ncols=3, figsize=(17,10), \
            sharex=True, sharey=True, gridspec_kw={'wspace': 0.05,'hspace': 0.1})
for i in range(3):
    for j in range(3):
        ax[i,j].imshow(result[i*3+j], cmap='gray')
        ax[i,j].axis('off')
plt.show()

Результаты выше выглядят идеально.

Давай попробуем что-нибудь другое. Мы собираемся показать вам, как объединить их обратно в одно изображение. Поскольку блоки изображений - это просто массивы, мы можем использовать один из многих способов, предлагаемых Dask. На этот раз мы используем функцию .block().

Мы располагаем эти фрагменты изображения в соответствии с их пространственным расположением, прежде чем объединять их вместе, потому что функции, которые мы хотим использовать дальше, должны знать, как обрабатывать края этих фрагментов.

data = [[result[i*3+j, ...] for j in range(3)] for i in range(3)]
combined_image = da.block(data)

Теперь мы можем попробовать использовать встроенный фильтр Гаусса для сглаживания изображения.

blurred_image = dask_image.ndfilters.gaussian_filter(combined_image, sigma=[0.5, 0.5])
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(17,10))
ax[0].imshow(blurred_image, cmap='gray')
ax[1].imshow(blurred_image - combined_image, cmap='gray')
# Subplot headings
ax[0].set_title('blurred_image')
ax[1].set_title('Difference from original')
# Don't display axes
ax[0].axis('off')
ax[1].axis('off')
# Display images
plt.show(fig)

А вот и самое интересное. Хотя мы уже объединили эти фрагменты в одно изображение, .gaussian_filter() все еще знает, что оно исходит из нескольких фрагментов, и разумным образом распределяет вычислительные задачи между рабочими. Фактически, именно для этого и предназначен Dask-Image. В подобных ситуациях Dask-Image изо всех сил старается распараллеливать вычислительные задачи, чтобы повысить производительность.

Давайте проверим нашу теорию, еще раз посмотрев на ее график задач.

blurred_image.visualize()

Вы могли заметить девять групп вертикально расположенных узлов на приведенной выше диаграмме. У нас девять стволов. Какое совпадение! Каждый из них представляет собой процесс прохождения одного фрагмента изображения.

Помните 2D-список, который мы передали в .block()? Если вы посмотрите достаточно внимательно, вы можете найти связку линий, соединяющих вертикальные группы вместе. Это означает, что .gaussian_filter() требуется информация от соседних пикселей для выполнения вычислений.

Рад, что мы сказали Dask-Image, как расположить наши чанки, не так ли?

Давайте подготовим изображение для количественного анализа, сегментируя его с помощью специальной пороговой функции. Мы можем присвоить каждому сегменту уникальную метку с помощью .label() функции.

threshold_value = 0.5 * da.max(blurred_image).compute()
threshold_image = blurred_image > threshold_value
label_image, num_labels = dask_image.ndmeasure.label(threshold_image)
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(17,10))
ax[0].imshow(threshold_image, cmap='gray')
ax[1].imshow(label_image, cmap='viridis')
# Subplot headings
ax[0].set_title('threshold_image')
ax[1].set_title('label_image')
# Don't display axes
ax[0].axis('off')
ax[1].axis('off')
# Display images
plt.show(fig)

Dask-Image предлагает множество функций в пакете ndmeasure, которые полезны для количественного анализа.

Давайте попробуем получить среднее значение и стандартное отклонение для каждой отмеченной области с помощью встроенных функций.

import pandas as pd
index = list(range(int(num_labels)))
mean_values = dask_image.ndmeasure.labeled_comprehension(combined_image, \
                label_image, index, dask_image.ndmeasure.mean, \
                float, None, pass_positions=False)
stdev_values = dask_image.ndmeasure.labeled_comprehension(combined_image, \
                label_image, index, dask_image.ndmeasure.standard_deviation, \
                float, None, pass_positions=False)
df = pd.DataFrame()
df['label'] = index
df['mean'] = mean_values.compute()
df['standard_deviation'] = stdev_values.compute()
df.head(10)

Мы подошли к концу нашего пошагового руководства Dask-Image.

Окончательные результаты нашего анализа могут быть сохранены в DataFrame pandas.

Следующие шаги

Основанная в 2015 году, Dask превратилась в одну из лучших сред для параллельных вычислений в Python. В будущем мы с нетерпением ждем появления большего количества библиотек и инструментов Python, поддерживающих Dask.

использованная литература

[1] Документация Dask, https://docs.dask.org/en/latest/index.html

[2] Примеры Dask, https://examples.dask.org

[3] Dask-ML, https://ml.dask.org

[4] Изображение Dask, http://image.dask.org/en/latest/