Parquet - это формат файла с открытым исходным кодом от Apache для инфраструктуры Hadoop. Ну, он начинался как формат файла для Hadoop, но с тех пор стал очень популярным, и даже поставщики облачных услуг, такие как AWS, начали поддерживать этот формат файла. Это могло означать только то, что Parquet должен что-то делать правильно. В этом посте мы увидим, что именно представляет собой формат файла Parquet, а затем мы увидим простой пример Java для создания или записи файлов Parquet.

Введение в формат файла паркета

Мы храним данные в виде строк в традиционном подходе. Но Parquet использует другой подход, когда он объединяет данные в столбцы перед их сохранением. Это позволяет лучше сжать данные для хранения, а также повысить производительность запросов. Кроме того, благодаря такому подходу к хранению формат может обрабатывать наборы данных с большим количеством столбцов.

Из-за всех этих функций в большинстве проектов с большими данными используется формат файлов Parquet. Файлы паркета также уменьшают объем необходимого места для хранения. В большинстве случаев мы используем запросы с определенными столбцами. Прелесть формата файла в том, что все данные столбца смежны, поэтому запросы выполняются быстрее.

Из-за оптимизации и популярности формата файлов даже Amazon предоставляет встроенные функции для преобразования входящих потоков данных в файлы Parquet перед сохранением в S3 (который действует как озеро данных). Я широко использовал это с Amazon’s’Athena и некоторыми сервисами Apache. Для получения дополнительной информации о файловой системе Parquet вы можете обратиться к официальной документации.

Зависимости

Прежде чем мы начнем писать код, нам нужно позаботиться о зависимостях. Поскольку это проект Maven Spring Boot, мы перечислим все наши зависимости в файле pom.xml:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.apache.parquet</groupId>
        <artifactId>parquet-hadoop</artifactId>
        <version>1.8.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-core</artifactId>
        <version>1.2.1</version>
    </dependency>
</dependencies>

Как видите, мы добавляем стартовый пакет Spring Boot и несколько других зависимостей Apache. Для этого примера это все, что нам нужно.

Свойства

Как всегда, у нас есть файл application.properties, в котором мы указываем все свойства. В этом примере нам нужны только два свойства: одно задает путь к файлу схемы, а другое указывает путь к выходному каталогу. Подробнее о схеме мы узнаем чуть позже. Итак, файл свойств выглядит так:

schema.filePath=
output.directoryPath=

А поскольку это приложение Spring Boot, мы будем использовать аннотацию @Value для чтения этих значений в коде:

@Value("${schema.filePath}")
private String schemaFilePath;

@Value("${output.directoryPath}")
private String outputDirectoryPath;

Схема паркета

Нам нужно указать схему данных, которые мы собираемся записать в файл Parquet. Это связано с тем, что при создании двоичного файла Parquet тип данных каждого столбца также сохраняется. На основе схемы, которую мы предоставляем в файле схемы, код соответствующим образом отформатирует данные перед их записью в файл Parquet.

В этом примере я делаю все просто, как вы можете видеть из файла схемы ниже:

message m { 
    required INT64 id; 
    required binary username; 
    required boolean active; 
}

Позвольте мне объяснить, что это такое. Первый параметр имеет тип INT64, который является целым числом, и называется id. Второе поле имеет двоичный тип, который представляет собой не что иное, как строку. Мы называем это полем имя пользователя. Третье - это логическое поле active. Это довольно простой пример. Но, к сожалению, если в ваших данных есть сотня столбцов, вам придется объявить их все здесь.

Ключевое слово required перед объявлением поля используется для проверки, чтобы убедиться, что для этого поля указано значение. Это необязательно, и вы можете удалить его для полей, которые не являются обязательными.

Паркетный писатель

Время отказа от ответственности, я не писал эти два класса, которые обсуждаю в этом разделе. Несколько месяцев назад, когда я исследовал это, я нашел эти два класса на StackOverFlow. Я не знаю, кто это написал, но я просто везде использовал эти два класса. Но да, я переименовал классы в соответствии с проектом.

Во-первых, класс CustomParquetWriter. Это расширяет класс ParquetWriter, предоставляемый Apache. Код этого класса выглядит следующим образом:

public class CustomParquetWriter extends ParquetWriter<List<String>> {

    public CustomParquetWriter(
            Path file,
            MessageType schema,
            boolean enableDictionary,
            CompressionCodecName codecName
    ) throws IOException {
        super(file, new CustomWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
    }
}

Здесь не о чем говорить. Следующим идет CustomWriteSupport, который вы можете увидеть как второй параметр конструктора super () в приведенном выше фрагменте. Здесь происходит много всего. Вы можете проверить репо для всего класса и посмотреть, что он делает.

По сути, класс проверяет схему, чтобы определить тип данных каждого поля. После этого с использованием экземпляра класса RecordConsumer данные записываются в файл. Я не буду много говорить об этих двух классах, потому что а) я их не писал и б) код достаточно прост, чтобы любой мог его понять.

Подготовка данных для файла Parquet

Давайте подготовим данные для записи в файлы Parquet. Список строк представляет собой один набор данных для файла Parquet. Каждый элемент в этом списке будет значением корректирующего поля в файле схемы. Например, предположим, что у нас есть следующий список:

Глядя на файл схемы, мы можем сказать, что первое значение в массиве - это идентификатор, второе значение - это имя, а третье значение - это логический флаг для активного поля.

Итак, в нашем коде у нас будет список из String для представления нескольких строк. Да, вы правильно прочитали, это список строк:

List<List<String>> columns = getDataForFile();

Давайте посмотрим на функцию, чтобы увидеть, как мы генерируем данные:

private List<List<String>> getDataForFile() {
    List<List<String>> data = new ArrayList<>();

    List<String> parquetFileItem1 = new ArrayList<>();
    parquetFileItem1.add("1");
    parquetFileItem1.add("Name1");
    parquetFileItem1.add("true");

    List<String> parquetFileItem2 = new ArrayList<>();
    parquetFileItem2.add("2");
    parquetFileItem2.add("Name2");
    parquetFileItem2.add("false");

    data.add(parquetFileItem1);
    data.add(parquetFileItem2);

    return data;
}

Это довольно просто, правда? Тогда пойдем дальше.

Получение файла схемы

Как мы уже говорили, у нас есть файл схемы. Нам нужно включить эту схему в код, в частности, как экземпляр класса MessageType. Посмотрим, как это сделать:

MessageType schema = getSchemaForParquetFile();

...

private MessageType getSchemaForParquetFile() throws IOException {
    File resource = new File(schemaFilePath);
    String rawSchema = new String(Files.readAllBytes(resource.toPath()));
    return MessageTypeParser.parseMessageType(rawSchema);
}

Как видите, мы просто читаем файл как строку, а затем анализируем эту строку с помощью метода parseMessageType () в классе MessageTypeParser, предоставляемом библиотекой Apache.

Получение паркетного писателя

Это почти последний шаг в процессе. Нам просто нужно получить экземпляр класса CustomParquetWriter, который мы обсуждали ранее. Здесь мы также указываем путь к выходному файлу, в который будет записывать писатель. Код для этого также довольно прост:

CustomParquetWriter writer = getParquetWriter(schema);

...

private CustomParquetWriter getParquetWriter(MessageType schema) throws IOException {
    String outputFilePath = outputDirectoryPath+ "/" + System.currentTimeMillis() + ".parquet";
    File outputParquetFile = new File(outputFilePath);
    Path path = new Path(outputParquetFile.toURI().toString());
    return new CustomParquetWriter(
            path, schema, false, CompressionCodecName.SNAPPY
    );
}

Запись данных в файл паркета

Это последний шаг, нам просто нужно записать данные в файл. Мы зациклим список созданного нами списка и запишем каждый список в файл, используя средство записи, созданное на предыдущем шаге:

for (List<String> column : columns) {
    writer.write(column);
}
logger.info("Finished writing Parquet file.");

writer.close();

Вот и все. Вы можете перейти в выходной каталог и проверить созданный файл. Например, вот что я получил после запуска этого проекта:

Если вы хотите начать непосредственно с рабочего примера, вы можете найти проект Spring Boot в моем репозитории Github. А если у вас есть сомнения или вопросы, не стесняйтесь спрашивать меня в комментариях.

Первоначально опубликовано на https://blog.contactsunny.com 7 апреля 2020 г.