Большие данные в тренде. Компаниям приходится оперировать огромными объемами данных, чтобы конкурировать с другими. Например, эта информация используется, чтобы показывать вам релевантную рекламу и рекомендовать вам услуги, которые могут вас заинтересовать.

Проблема с программными системами больших данных заключается в их сложности. Тестирование становится жестким. Как вы могли бы проверить поведение приложения локально, когда оно настроено для подключения к кластеру HDFS?

В этой статье я покажу вам, как создать приложение Spring Boot, которое загружает данные из Apache Hive через Apache Spark в базу данных Aerospike. Более того, я даю вам рецепт написания интеграционных тестов для таких сценариев, которые можно запускать либо локально, либо во время выполнения CI-конвейера. Примеры кода взяты из этого репозитория.

Во-первых, давайте разберемся с некоторыми основными понятиями используемого нами стека больших данных. Не волнуйтесь, это не займет много времени. Но необходимо понять основную идею.

Основы HDFS

HDFS (распределенная файловая система Hadoop) — это распределенная файловая система, предназначенная для работы на многих физических серверах. Итак, файл в HDFS — это абстракция, скрывающая сложность хранения и репликации данных между несколькими узлами. Зачем нам нужна HDFS? Есть некоторые причины.

Аппаратные сбои

Сбой жестких дисков. Это реальность, с которой нам приходится иметь дело. Если файл разделен между несколькими узлами, отдельные сбои не повлияют на все данные. Кроме того, данные реплицируются в HDFS. Таким образом, даже после сбоя диска информацию можно восстановить из других источников.

Действительно большие файлы

HDFS позволяет собрать сеть из не очень мощных машин в огромную систему. Например, если у вас есть 100 узлов с дисковым хранилищем по 1 ТБ на каждом, то у вас есть 100 ТБ пространства HDFS. Если коэффициент репликации равен 3, можно хранить один файл размером 33 ТБ.

Не говоря уже о том, что некоторые локальные файловые системы не поддерживают такие большие файлы, даже если у вас есть свободное место на диске.

Скорость чтения

Если вы будете читать файл последовательно, это займет у вас N. Но если файл разбит на 10 фрагментов между 10 узлами, вы можете получить его содержимое за N/10 времени! Потому что каждый узел может читать свой фрагмент параллельно. Итак, HDFS — это не только безопасность. Речь идет о быстроте.

Я не учел время, затрачиваемое на сетевое общение. Но если файлы большие, эта часть будет незначительной.

Основы Apache Hive

Apache Hive — это база данных, работающая поверх HDFS. Это позволяет запрашивать данные с помощью HQL (SQL-подобного языка).

Обычные базы данных (например, PostgreSQL, Oracle) действуют как уровень абстракции над локальной файловой системой. В то время как Apache Hive действует как абстракция над HDFS. Вот и все.

Основы Apache Spark

Apache Spark — это платформа для работы и преобразования огромных объемов данных. Основная идея заключается в том, что рабочие процессы Apache Spark выполняются на нескольких узлах и сохраняют промежуточные результаты в оперативной памяти. Он написан на Scala, но также поддерживает Java и Python. Взгляните на схему ниже. Это обычное представление пакетного задания Apache Spark.

Apache Spark загружает данные из Data Producer, выполняет над ними какие-то операции и помещает результат в Data Consumer (в нашем случае Apache Hive — производитель данных, а Aerospike — потребитель данных). Приложение Apache Spark представляет собой обычный файл .jar, содержащий логику преобразования. Взгляните на пример ниже.

Это простое приложение для подсчета слов. Во-первых, мы загружаем содержимое файла raw_data.txt HDFS. Затем мы разделяем каждую строку на " ", присваиваем 1 каждому слову и уменьшаем результат по словам, чтобы суммировать целые числа. Затем полученные пары сохраняются в word_count.txt.

Поток аналогичен Java Stream API. Разница в том, что каждое лямбда-выражение выполняется на рабочих процессах. Итак, Spark передает код на удаленные машины, производит расчет и возвращает полученные результаты. Если мы должны достаточное количество воркеров, мы можем перейти к объему данных, который измеряется терабайтами или даже зеттабайтами.

Подход Apache Spark к доставке кода в данные имеет некоторые недостатки. Мы обсудим это, когда приступим к разработке.

Еще одним важным аспектом является лень. Как и Stream API, Apache Spark не запускает никаких вычислений до вызова терминальной операции. В данном случае это reduceByKey. Остальные операции строят правила конвейера, но ничего не запускают.

Конфигурация сборки

Начнем процесс разработки. Во-первых, нам нужно выбрать версию Java. На момент написания последней стабильной версии Apache Spark является 3.2.1. Он поддерживает Java 11. Так что мы будем его использовать.

В настоящее время Apache Spark не поддерживает Java 17. Убедитесь, что вы не используете его для запуска интеграционных тестов. В противном случае вы получите странные сообщения об ошибках.

Проект загружается с помощью Spring Initializr. Здесь нет ничего особенного. Но список зависимостей следует уточнить.

Разрешение зависимостей

Основные зависимости

Сначала идут зависимости Apache Spark. Артефакт spark-core является корнем. spark-hive позволяет получать данные из Apache Hive. А зависимость spark-sql дает нам возможность запрашивать данные из Apache Hive с использованием SQL.

Обратите внимание, что все артефакты должны иметь одну и ту же версию (в нашем случае это 3.2.1). На самом деле версия зависимостей Apache Spark должна совпадать с версией, на которой работает рабочий кластер в вашей компании.

Все зависимости Spark должны быть помечены как compileOnly. Это означает, что они не будут включены в собранный файл .jar. Apache Spark предоставит необходимые зависимости во время выполнения. Если вы включите их как область действия implementation, это может привести к ошибкам жесткого отслеживания во время выполнения.

Тогда у нас есть зависимость aerospike-client. Вы, наверное, заметили, что группа org.slf4j везде исключена и включена как зависимость compileOnly. Мы поговорим об этом позже, когда перейдем к средству ведения журнала Apache Spark.

Проверка зависимостей

И, наконец, вот артефакты для тестирования. Apache Spark включены как testImplementation. Потому что интеграционные тесты запустят локальный узел Spark. Таким образом, они необходимы во время выполнения. slf4j-api также является зависимостью времени выполнения. Testcontainers будут использоваться для запуска экземпляра Aerospike. janino требуется Apache Spark во время выполнения задания. И нам нужен Apache Derby, чтобы настроить Apache Hive для локального запуска. Мы скоро доберемся до этого.

Конфигурация ведения журнала

Apache Spark применяет log4j с оболочкой slf4j. Но регистратор Spring Boot по умолчанию — logback. Эта настройка приводит к исключениям во время инициализации контекста Spring из-за нескольких средств ведения журнала, присутствующих в пути к классам. Самый простой способ решить эту проблему — исключить все автоматически настроенные функции ведения журнала Spring Boot. Это не имеет большого значения. В любом случае, Apache Spark предоставляет собственную slf4j реализацию во время выполнения. Итак, нам просто нужно включить эту зависимость как compileOnly. Этого достаточно.

Исключить logback из проекта Spring Boot легко с помощью Gradle. Взгляните на пример ниже.

Возможных application.yml проблем

Исключение snakeyml требует особого внимания. Spring Boot использует библиотеку для анализа свойств из .yml файлов (т. е. application.yml). Некоторые версии Apache Spark используют одну и ту же библиотеку для внутренних операций.

Дело в том, что версии, требуемые Spring Boot и Apache Spark, различаются. Если вы исключите его из зависимости Spring Boot и полагаетесь на тот, который предоставляется Apache Spark, вы столкнетесь с NoSuchMethodError (Spring Boot вызывает метод, отсутствующий в версии, предоставляемой Apache Spark). Итак, я бы рекомендовал придерживаться формата .properties и удалить автоконфигурацию Spring Boot YAML. Это поможет вам избежать ненужных трудностей. Взгляните на пример кода ниже.

толстая банка

Результат .jar будет отправлен в кластер Apache Spark (например, spark-submit command). Таким образом, он должен содержать все артефакты времени выполнения. К сожалению, стандартная упаковка Spring Boot не размещает зависимости так, как этого ожидает Apache Spark. Итак, мы будем использовать плагин для Gradle shadow-jar. Взгляните на пример ниже.

Теперь мы можем запустить все тесты и собрать артефакт с помощью команды ./gradlew test shadowJar.

Начало разработки

Теперь мы можем перейти к процессу разработки.

Конфигурация Apache Spark

Нам нужно объявить JavaSparkContext и SparkSession. Первый — это основной Apache Spark для всех операций. Пока SparkSession является частью spark-sql проектов. Это позволяет нам запрашивать данные с помощью SQL (что очень удобно для Apache Hive). Взгляните на конфигурацию Spring ниже.

SparkConf определяет ключи конфигурации для задания Apache Spark. Как вы заметили, есть два бина для разных профилей Spring. LOCAL используется для интеграционного тестирования, а PROD применяется в производственной среде. Конфигурация PROD не объявляет никаких свойств, потому что обычно они передаются как аргументы командной строки в сценарии оболочки spark-submit.

Напротив, профиль LOCAL определяет набор свойств по умолчанию, необходимых для правильной работы. Вот самые важные из них.

  1. setMaster("local") указывает Apache Spark запустить один локальный узел.
  2. javax.jdo.option.ConnectionURL и javax.jdo.option.ConnectionDriverName объявляют соединение JDBC для метахранилища Apache Hive. Вот почему мы добавили Apache Derby в качестве зависимости проекта.
  3. spark.sql.catalogImplementation означает, что локальные файлы должны храниться в формате, совместимом с Apache Hive.
  4. spark.sql.warehouse.dir — это каталог для хранения данных Apache Hive. Здесь мы используем временный каталог.

JavaSparkContext принимает определенные SparkConf в качестве аргументов конструктора. Тем временем SparkSession обертывает существующий JavaSparkContext. Обратите внимание, что поддержку Apache Hive следует включать вручную (enableHiveSupport).

Создание таблиц Apache Hive

Когда мы отправляем приложение в производственный кластер Apache Spark, нам, вероятно, не потребуется создавать какие-либо таблицы Apache Hive. Скорее всего, таблицы уже созданы кем-то другим. И наша цель — выделить строки и перенести данные в другое хранилище. Но когда мы запускаем интеграционные тесты локально (или в среде CI), по умолчанию таблиц нет. Значит, нам нужно их как-то создать.

В этом проекте мы работаем с одной таблицей — media.subscriber_info. Он состоит из двух столбцов. MSISDN (номер телефона) и идентификатор абонента.

Перед каждым прогоном теста мы должны удалять предыдущие данные и добавлять новые строки, чтобы гарантировать согласованность правил. Самый простой способ добиться этого — объявить скрипты для создания и удаления таблиц. Мы будем хранить их в каталоге resources. Взгляните на структуру ниже.

V1_media.hql

Создает базу данных media, если она отсутствует.

create database if not exists media

V2__media.subscriber_info.hql

Создает таблицу subscriber_info, если она отсутствует.

create table if not exists media.subscriber_info (
  subscriber_id string,
  msisdn string
)
row format delimited
fields terminated by ','
lines terminated by '\n'
stored as textfile

УДАЛИТЬ V1__mediatv_dds.subscriber_info.hql

Удаляет таблицу subscriber_info.

drop table if exists media.subscriber_info

Префиксы V[N] не обязательны. Ставлю их для того, чтобы каждый новый скрипт таблицы выполнялся как последний. Полезно сделать так, чтобы тесты работали детерминировано.

Хорошо, теперь нам нужен обработчик для обработки этих HQL-запросов. Взгляните на пример ниже.

Первое, на что следует обратить внимание, — это использование @Profile(LOCAL). Потому что нам не нужно создавать или удалять таблицы в рабочей среде.

Методы createTables и dropTables предоставляют список ресурсов, содержащих необходимые запросы.

getResources — это служебная функция, которая считывает файлы из пути к классам. Ознакомиться с реализацией можно здесь.

Итак, теперь мы готовы написать бизнес-код!

Бизнес-код

Фасад

Основной интерфейс EnricherService

Мы ожидаем, что у него может быть много реализаций. Каждый из них представляет собой шаг во всем пакетном процессе.

Затем у нас есть EnricherServiceFacade, который инкапсулирует все реализации EnricherService и запускает их одну за другой.

Мы пытаемся запустить каждый предоставленный шаг обогащения. Если какой-то из них не работает, мы бросаем исключение, которое объединяет все ошибки в цельный кусок.

Наконец, нам нужно указать Spring выполнять EnricherServiceFacade.proceedEnrichment при запуске приложения. Мы могли бы добавить его непосредственно в метод main, но это не способ Spring. Следовательно, это усложняет тестирование. Лучший вариант - @EventListener.

Метод proceedEnrichment вызывается при запуске Spring context. Кстати, только активный профиль PROD вызовет задание.

Реализация EnricherService

Мы будем иметь дело с одной реализацией EnricherService. Он просто выбирает все строки из таблицы media.subcriber_info и помещает результат в базу данных Aerospike. Взгляните на фрагмент кода ниже.

Есть несколько моментов, которые необходимо уточнить.

Сериализация

Apache Spark применяет стандартный механизм сериализации Java. Итак, любые зависимости, используемые внутри лямбд (map, filter, groupBy, forEach и т. д.), должны реализовывать интерфейс Serializable. В противном случае вы получите NotSerializableException во время выполнения.

У нас есть ссылка на AerospikeProperties внутри обратного вызова foreachPartition. Следовательно, этот класс и сам SubscriberIdEnricherService должны быть разрешены для сериализации (поскольку последний сохраняет AerospikeProperties как поле). Если зависимость не используется ни в одной лямбде Apache Spark, вы можете пометить ее как transient.

И, наконец, ручное назначение serialVersionUID имеет решающее значение. Причина в том, что Apache Spark может несколько раз сериализовать и десериализовать переданные объекты. И нет никакой гарантии, что каждый раз автоматически сгенерированный serialVersionUID будет одним и тем же. Это может быть причиной жесткого отслеживания плавающих ошибок. Чтобы предотвратить это, вы должны объявить serialVersionUID самостоятельно.

Еще лучший подход — заставить компилятор проверять наличие поля serialVersionUID в любых классах Serializable. В этом случае вам нужно пометить -Xlint:serial предупреждение как ошибку. Взгляните на пример Gradle.

tasks.withType(JavaCompile) {
    options.compilerArgs << "-Xlint:serial" << "-Werror" 
}

Создание клиента Aerospike

К сожалению, клиент Java Aerospike не реализует интерфейс Serializable. Итак, мы должны создать его экземпляр внутри лямбда-выражения. В этом случае объект будет создан непосредственно на рабочем узле. Это делает сериализацию избыточной.

Должен признать, что Aerospike предоставляет Aerospike Connect Framework, который позволяет декларативно передавать данные через Apache Spark без создания каких-либо Java-клиентов. В любом случае, если вы хотите его использовать, вам нужно установить упакованную библиотеку напрямую в кластер Apache Spark. Нет никакой гарантии, что у вас будет такая возможность в вашей ситуации. Поэтому я опускаю этот сценарий.

Разметка

Класс Dataset имеет метод foreach, который просто выполняет заданную лямбду для каждой текущей строки. Однако, если вы инициализируете какой-либо тяжелый ресурс внутри этого обратного вызова (например, соединение с базой данных), новый будет создан для каждой строки (в некоторых случаях могут быть миллиарды строк). Не очень эффективно, не так ли?

Метод foreachPartition работает немного по-другому. Apache Spark выполняет его один раз для раздела Dataset. Он также принимает Iterator<Row> в качестве аргумента. Итак, внутри лямбды мы можем инициализировать «тяжелые» ресурсы (например, AerospikeClient) и применять их для вычислений каждого Row внутри итератора.

Размер раздела рассчитывается автоматически на основе источника входных данных и конфигурации кластера Apache Spark. Хотя вы можете установить его вручную, вызвав метод repartition. В любом случае, это выходит за рамки статьи.

Тестирование

Настройка Aerospike

Итак, мы написали бизнес-код. Как мы это протестируем? Во-первых, давайте объявим настройку Aerospike для Testcontainers. Взгляните на фрагмент кода ниже.

Класс IntegrationSuite используется как родительский для всех интеграционных тестов. Внутренний класс IntegrationSuite.Initializer используется в качестве инициализатора контекста Spring. Фреймворк вызывает его, когда все свойства и определения bean-компонентов уже загружены, но bean-компоненты еще не созданы. Это позволяет нам переопределять некоторые свойства во время выполнения.

Мы объявляем контейнер Aerospike как GenericContainer, потому что библиотека не обеспечивает готовой поддержки базы данных. Затем внутри метода initialize мы получаем хост и порт контейнера и назначаем их свойству aerospike.hosts.

Утилиты Apache Hive

Перед каждым тестовым методом мы предполагаем удалить все данные из Apache Hive и добавить новые строки, необходимые для текущего сценария. Таким образом, тесты не будут влиять друг на друга. Давайте объявим собственный тестовый фасад для Apache Hive. Взгляните на фрагмент кода ниже.

Есть всего два метода. cleanHive удаляет все существующие и создает их заново. Поэтому все предыдущие данные стираются. insertInto сложный. Он служит для вставки новых строк в Apache Hive статически типизированным способом. Как это делается? Прежде всего, давайте осмотрим интерфейс HiveTable<T>.

Как видите, это обычный функциональный интерфейс Java. Хотя реализации не так очевидны.

Класс принимает SparkSession в качестве зависимости конструктора. SubscriberInfo.Values — это общий аргумент. Класс представляет структуру данных, содержащую значения для вставки. И, наконец, реализация values выполняет фактическое создание новой строки.

Ключом является статический метод subscriberInfo. В чем причина возврата Function<SparkSession, SubscriberInfo>? Его комбинация с TestHiveUtils.insertInto дает нам статически типизированный оператор INSERT INTO. Взгляните на пример кода ниже.

Элегантное решение, вам не кажется?

Срез теста интеграции Spark

Интеграционные тесты Spring требуют определенной конфигурации. Разумно объявить его один раз и использовать повторно. Взгляните на фрагмент кода ниже.

Внутри SpringBootTest мы перечислили все bean-компоненты, которые используются во время выполнения тестов.

TestAerospikeFacade — это всего лишь тонкая оболочка клиента Java Aerospike для целей тестирования. Его реализация довольно проста, но вы можете ознакомиться с исходным кодом по этой ссылке.

EnricherServiceTestConfiguration — это конфигурация Spring, объявляющая все реализации интерфейса EnricherService. Взгляните на пример ниже.

Я хочу отметить, что все реализации EnricherService должны быть перечислены внутри класса. Если мы применим разные конфигурации для каждого набора тестов, контекст Spring будет перезагружен. В основном это не проблема. Но использование Apache Spark создает препятствия. Видите ли, когда создается JavaSparkContext, он запускает локальный узел Apache Spark. Но если мы создадим его дважды в течение жизненного цикла приложения, это приведет к исключению. Самый простой способ решить эту проблему — убедиться, что JavaSparkContext будет создан только один раз.

Теперь мы можем перейти к процессу тестирования.

Пример интеграционного теста

Вот простой интеграционный тест, который вставляет две строки в Apache Spark и проверяет, что соответствующие две записи создаются в Aerospike в течение 10 секунд. Взгляните на фрагмент кода ниже.

Если вы все настроите правильно, тест будет пройден.

Весь исходный код теста доступен по этой ссылке.

Заключение

Это в основном все, что я хотел рассказать вам о тестировании интеграции Apache Hive, Apache Spark и Aerospike с использованием Spring Boot. Как видите, мир больших данных не так уж и сложен. Все примеры кода взяты из этого репозитория. Вы можете клонировать его и поэкспериментировать с тестами самостоятельно.

Если у вас есть какие-либо вопросы или предложения, пожалуйста, оставьте свои комментарии ниже. Спасибо за прочтение!

Ресурсы

  1. Репозиторий с примерами
  2. HDFS (распределенная файловая система Hadoop)
  3. Апачский улей
  4. Апач Спарк
  5. Апач Дерби
  6. База данных Aerospike
  7. Аэроспайк Коннект Фреймворк
  8. API Java-потока
  9. Инициализация весны
  10. Пружинные профили
  11. Тестконтейнеры
  12. Gradle plugin shadow-jar