Vaex — невероятно мощная библиотека DataFrame на Python, способная обрабатывать сотни миллионов или даже миллиардов строк в секунду без загрузки всего набора данных в память. Это делает его особенно полезным для исследования, визуализации и статистического анализа больших наборов данных, которые превышают доступную оперативную память одной машины.

Однако дело не только в скорости, но и в удобстве и простоте использования. В этой статье мы рассмотрим 8 мощных и удобных функций, которые сделают ваш рабочий процесс анализа данных намного более эффективным.

1. Лениво открывать и работать с огромными файлами CSV

С момента своего создания Vaex мог очень эффективно работать с файлами, размер которых намного превышал объем доступной оперативной памяти. На самом деле это и есть цель библиотеки: позволить вам легко работать с произвольно большими наборами данных на общедоступном оборудовании. Раньше это было возможно только с двоичными форматами файлов, такими как HDF5, Arrow и Parquet.

Однако, начиная с версии Vaex 4.14.0, можно лениво открывать и работать с CSV-файлами так же просто, как и с упомянутыми выше бинарными форматами файлов. Отчасти это благодаря проекту Apache Arrow, который предоставляет довольно эффективную программу для чтения CSV. Давайте посмотрим, как это работает на практике.

print('Check file size on disk:')
!du -h /data/jovan/data/chicago-taxi/chicago_taxi_2013_2020.csv
print()

df = vaex.open('/data/jovan/data/chicago-taxi/chicago_taxi_2013_2020.csv')

print(f'Number of rows: {df.shape[0]:,}')
print(f'Number of columns: {df.shape[1]}')

mean_tip_amount = df.tip_amount.mean(progress='widget')
print(f'Mean tip amount: {mean_tip_amount:.2f}')

df.fare_amount.viz.histogram(shape=128, figsize=(6, 4), limits=[0, 42], progress='widget');

В приведенном выше примере показано, как легко можно работать с довольно большим CSV-файлом. Опишем приведенный выше пример немного подробнее:

  • Когда мы используем vaex.open() с CSV-файлом, Vaex будет выполнять потоковую передачу по всему CSV-файлу, чтобы определить количество строк и столбцов, а также тип данных каждого столбца. Хотя это не требует значительного объема ОЗУ, это может занять некоторое время, в зависимости от количества строк и столбцов CSV. Можно контролировать, насколько тщательно Vaex читает файл с помощью аргумента schema_infer_fraction. Меньшее число приведет к более быстрому чтению, но определение типа данных может быть менее точным — все зависит от содержимого CSV-файла. В приведенном выше примере мы читаем файл CSV размером 76 ГБ, состоящий из чуть менее 200 миллионов строк и 23 столбцов, примерно за 5 секунд с параметрами по умолчанию.
  • Затем мы вычислили среднее значение столбца «tip_amount», что заняло 6 секунд.
  • Наконец, мы построили гистограмму для столбца `tip_amount`, что заняло 8 секунд.

В целом мы прочитали весь CSV-файл размером 76 ГБ 3 раза менее чем за 20 секунд, без необходимости загружать весь файл в память.

Также обратите внимание, что API и общее поведение Vaex одинаковы, независимо от формата файла. Это означает, что вы можете легко переключаться между файлами CSV, HDF5, Arrow и Parquet без изменения кода.

Конечно, использование CSV-файлов неоптимально с точки зрения производительности, и его следует избегать по целому ряду причин. Тем не менее, большие CSV-файлы всплывают в открытом доступе, что делает эту функцию весьма удобной для быстрой проверки и исследования их содержимого, а также для эффективного преобразования в более подходящие форматы файлов.

2. Группировка по агрегатам

Группировка по — одна из самых распространенных операций в анализе данных, не нуждающаяся в особом представлении. Существует два основных способа указать операции агрегации в Vaex:

  • Вариант 1. Укажите столбец, который необходимо агрегировать, и псевдоним операции агрегирования. Результирующий столбец будет иметь то же имя, что и входной столбец.
  • Вариант 2. Укажите имя выходного столбца, а затем явно укажите метод агрегации vaex.

Давайте посмотрим, как это работает на практике. Обратите внимание, что в оставшейся части этой статьи мы будем использовать подмножество набора данных такси Нью-Йорка, которое содержит более 1 миллиарда строк!

df = vaex.open('/data/yellow_taxi_2009_2015_f32.hdf5')

print(f'Number of rows: {df.shape[0]:,}')
print(f'Number of columns: {df.shape[1]}')


df.groupby(df.vendor_id, progress='widget').agg(
    {'fare_amount': 'mean',                              # Option 1
     'tip_amount_mean': vaex.agg.mean(df.tip_amount),    # Option 2
    })

Приведенный выше пример довольно стандартный. На самом деле многие библиотеки DataFrame имеют похожий синтаксис. Чем хорош Vaex, так это тем, что второй вариант позволяет создавать линейную комбинацию агрегаторов. Например:

df.groupby(df.vendor_id, progress='widget').agg(
    {'fare_amount_norm': vaex.agg.mean(df.fare_amount) / vaex.agg.std(df.fare_amount)}
    )

Явное указание функций агрегации (вариант 2 выше) имеет дополнительное преимущество: оно позволяет указать «выборку», то есть подмножество данных, с которыми будет работать агрегатор. Это особенно полезно, когда нужно выполнить несколько операций агрегирования с разными подмножествами данных в одной и той же операции группировки! Например:

df.groupby(df.vendor_id, progress='widget').agg(
    {'fare_amount_mean_all': vaex.agg.mean(df.fare_amount),
     'fare_amount_mean_pc2': vaex.agg.mean(df.fare_amount, selection=df.passenger_count == 2),
     'fare_amount_mean_pc4': vaex.agg.mean(df.fare_amount, selection=df.passenger_count == 4)}
)

За один проход по данным мы вычислили среднее значение столбца «fare_amount» для всех строк, а также для строк, где количество пассажиров было 2 и 4. Это очень удобная функция! Просто подумайте, как бы вы сделали это с другими библиотеками DataFrame или SQL. Нужно будет выполнить три отдельных групповых агрегирования, а затем объединить результаты вместе. Vaex немного упрощает это.

3. Индикаторы выполнения

При выполнении анализа данных или написании конвейеров преобразования данных обычно используются различные этапы, некоторые из которых более сложны, чем другие. Таким образом, часто бывает полезно иметь представление о том, сколько времени занимает каждый шаг и сколько времени осталось до завершения всего конвейера, особенно при работе с довольно большими файлами. Возможно, вы уже заметили, что многие методы Vaex поддерживают индикаторы выполнения. Это тоже весьма полезно, но мы можем сделать лучше! С помощью богатой библиотеки Vaex может предоставить подробную информацию о ходе каждой отдельной операции в процессе преобразования данных. Это особенно полезно для выявления узких мест и способов повышения производительности вашего кода. Давайте посмотрим, как это выглядит на практике:

with vaex.progress.tree('rich'):
    result_1 = df.groupby(df.passenger_count, agg='count')
    result_2 = df.groupby(df.vendor_id, agg=vaex.agg.sum('fare_amount'))
    result_3 = df.tip_amount.mean()

В приведенном выше примере сначала мы хотим узнать, сколько выборок имеется для каждого значения столбца «passenger_count». Для этого Vaex сначала вычисляет набор уникальных значений «passenger_count», а затем выполняет агрегацию для каждого из них.

Нечто подобное происходит и во второй операции, где мы хотим узнать общее «fare_amount» для каждого «vendor_id».

Наконец, мы просто хотим узнать среднее значение столбца «tip_amount», которое представляет собой отношение суммы к количеству выборок в этом столбце.

В дереве прогресса выше вы можете увидеть время, затраченное на каждый из этих шагов. Это упрощает выявление узких мест, что дает возможность улучшить процесс обработки данных. Дерево прогресса также позволяет заглянуть за кулисы и посмотреть, какие операции выполняет Vaex, чтобы получить нужный вам результат.

Обратите внимание, что справа от третьего прогресса есть числа в квадратных скобках рядом с таймингами. Эти числа указывают, сколько раз Vaex должен просмотреть данные, чтобы вычислить результат до этой конкретной точки в наборе операций, заключенных в диспетчере контекста дерева выполнения. Например, мы можем видеть, что при вычислении среднего значения столбца Vaex требуется только один проход по данным для вычисления как количества, так и суммы этого столбца.

4. Асинхронные оценки

Vaex старается быть максимально ленивым и вычисляет выражения только тогда, когда это необходимо. Общее правило заключается в том, что для операций, которые не изменяют фундаментальную природу исходного DataFrame, операции оцениваются лениво. Примером этого является создание новых столбцов из существующих, например, объединение нескольких столбцов в новый или выполнение какого-либо категориального кодирования. Фильтрация DataFrame также относится к этой группе. Операции, которые изменяют характер DataFrame, такие как операции группировки или простое вычисление агрегатов, таких как сумма или среднее значение столбца, с нетерпением оцениваются. Такой рабочий процесс обеспечивает хороший баланс между производительностью и удобством при интерактивном исследовании или анализе данных.

Когда процесс преобразования данных или конвейер данных четко определены, может потребоваться оптимизация с учетом производительности. Аргумент delay=True, поддерживаемый многими методами Vaex, позволяет планировать выполнение операций параллельно. Это позволяет Vaex заранее построить вычислительный график и попытаться найти наиболее эффективный способ вычисления результата. Вернемся к предыдущему примеру, но теперь запланируем одновременное выполнение всех операций:

with vaex.progress.tree('rich'):
    result_1 = df.groupby(df.passenger_count, agg='count', delay=True)
    result_2 = df.groupby(df.vendor_id, agg=vaex.agg.sum('fare_amount'), delay=True)
    result_3 = df.tip_amount.mean(delay=True)
    df.execute()

Мы видим, что, явно используя отложенные оценки, мы можем повысить производительность и сократить количество раз, которое нам нужно просмотреть данные. В этом случае мы перешли от 5 проходов по данным к всего 2 проходам при использовании отложенных вычислений, что привело к увеличению скорости примерно на 30%. Вы можете найти больше примеров по асинхронному программированию с помощью Vaex в этом руководстве.

5. Кэширование

Из-за своей скорости Vaex часто используется в качестве серверной части для информационных панелей и приложений для работы с данными, особенно тех, которым необходимо обрабатывать большие объемы данных. При использовании приложения для работы с данными некоторые операции часто выполняются повторно с одними и теми же или похожими подмножествами данных. Например, пользователи начнут с одной и той же домашней страницы и выберут общие или популярные варианты, прежде чем углубляться в данные. В таких случаях часто полезно кэшировать результаты операций, чтобы их можно было быстро получить при необходимости. Это особенно полезно, когда данные большие, а операции сложные. Vaex реализует расширенный гранулярный механизм кэширования, который позволяет кэшировать результаты отдельных операций, которые впоследствии можно повторно использовать. В следующем примере показано, как это работает:

vaex.cache.on()

with vaex.progress.tree('rich'):
    result_1 = df.passenger_count.nunique()

with vaex.progress.tree('rich'):
    result_2 = df.groupby(df.passenger_count, agg=vaex.agg.mean('trip_distance'))

Сочетание механизма кэширования с отложенными вычислениями приводит к превосходной производительности. Поэтому неудивительно, почему Vaex часто используется в качестве серверной части для приложений данных.

6. Ранняя остановка

Vaex имеет довольно эффективный способ определения мощности столбца. При использовании методов unique, nunique или groupby также можно указать аргумент limit. Это устанавливает предел ожидаемого количества уникальных значений в столбце. Если предел превышен, Vaex может вызвать исключение или вернуть найденные на данный момент уникальные значения.

Это особенно полезно при создании приложений данных, которые позволяют пользователям свободно выбирать столбцы из набора данных. Вместо того, чтобы тратить время и ресурсы на массивную операцию группировки, которую в конечном итоге нельзя будет визуализировать, мы можем остановиться раньше и без перегрузки памяти и предупредить пользователя или автоматически выбрать другой путь для обработки или визуализации. данные.

В приведенном ниже примере мы устанавливаем для limit значение 100 и указываем Vaex не вызывать исключение при достижении предела, а просто возвращать результат, который в данном случае является просто пределом:

result = df.pickup_longitude.nunique(limit=100, limit_raise=False)
print(f'result: {result}')

7. Облачная поддержка

По мере роста наборов данных становится все более распространенным и практичным хранить их в облаке, а локально хранить только те части данных, над которыми мы активно работаем. Vaex вполне дружелюбен к облачным технологиям — он может легко загружать (потоковые) данные из любого общедоступного облачного хранилища. Что еще более важно, будут получены только те данные, которые необходимы. Например, при выполнении `df.head()` будут выбраны только первые 5 строк. Чтобы вычислить среднее значение столбца, нужны все данные из этого конкретного столбца, и Vaex будет передавать эту часть данных, но не будет извлекать данные из любого другого столбца, пока они не потребуются. Это довольно эффективно и практично:

df_cloud = vaex.open('gs://vaex-data/airlines/us_airline_data_1988_2019.hdf5')

print('Size of data:')
print(f'Number of rows: {df_cloud.shape[0]:,}')
print(f'Number of columns: {df_cloud.shape[1]}')

df_cloud.head()

8. Использование графического процессора (NVIDIA, RADEON, M1, M2)

И последнее, но не менее важное: знаете ли вы, что Vaex может использовать графические процессоры для ускорения вычисления выражений, требующих значительных вычислительных ресурсов? Что еще более впечатляет, он поддерживает NVIDIA для платформ Windows и Linux, а также Radeon и Apple Silicon для Mac OS. Специалисты по работе с данными наконец-то могут воспользоваться преимуществами устройств Apple Silicon для анализа данных!

В приведенном ниже примере мы определяем функцию, которая вычисляет расстояние по дуге между двумя точками на сфере. Это довольно сложная математическая операция, включающая много тригонометрии и арифметики. Давайте попробуем вычислить среднее расстояние по дуге для всех поездок на такси в нашем наборе данных о такси в Нью-Йорке, который содержит более 1 миллиарда строк:

print(f'Number of rows: {df.shape[0]:,}')

def arc_distance(theta_1, phi_1, theta_2, phi_2):
    temp = (np.sin((theta_2-theta_1)/2*np.pi/180)**2
           + np.cos(theta_1*np.pi/180)*np.cos(theta_2*np.pi/180) 
           * np.sin((phi_2-phi_1)/2*np.pi/180)**2)
    distance = 2 * np.arctan2(np.sqrt(temp), np.sqrt(1-temp))
    return distance * 3958.8

df['arc_distance_miles_numpy'] = arc_distance(df.pickup_longitude, df.pickup_latitude, 
                                              df.dropoff_longitude, df.dropoff_latitude)

# Requires cupy and NVDIA GPU
df['arc_distance_miles_cuda'] = df['arc_distance_miles_numpy'].jit_cuda()

# Requires metal2 support on MacOS (Apple Silicon and Radeon GPU supported)
# df['arc_distance_miles_metal'] = df['arc_distance_miles_numpy'].jit_metal()

result_cpu = df.arc_distance_miles_numpy.mean(progress='widget')
result_gpu = df.arc_distance_miles_cuda.mean(progress='widget')
print(f'CPU: {result_cpu:.3f} miles')
print(f'GPU: {result_gpu:.3f} miles')

Мы видим, что с помощью графического процессора мы можем получить неплохой прирост производительности. Если у вас нет доступа к графическому процессору, не беспокойтесь! Vaex также поддерживает компиляцию Just-In-Time с помощью Numba и Pythran, что также может обеспечить значительный прирост производительности.

С этим последним пунктом мы подошли к концу этой статьи. Я надеюсь, что вы попробуете хотя бы некоторые из этих функций, и я надеюсь, что они помогут вам в анализе данных так же, как и мне. До скорого!