Parquet - это формат файла с открытым исходным кодом от Apache для инфраструктуры Hadoop. Ну, он начинался как формат файла для Hadoop, но с тех пор стал очень популярным, и даже поставщики облачных услуг, такие как AWS, начали поддерживать этот формат файла. Это могло означать только то, что Parquet должен что-то делать правильно. В этом посте мы увидим, что именно представляет собой формат файла Parquet, а затем мы увидим простой пример Java для создания или записи файлов Parquet.
Введение в формат файла паркета
Мы храним данные в виде строк в традиционном подходе. Но Parquet использует другой подход, когда он объединяет данные в столбцы перед их сохранением. Это позволяет лучше сжать данные для хранения, а также повысить производительность запросов. Кроме того, благодаря такому подходу к хранению формат может обрабатывать наборы данных с большим количеством столбцов.
Из-за всех этих функций в большинстве проектов с большими данными используется формат файлов Parquet. Файлы паркета также уменьшают объем необходимого места для хранения. В большинстве случаев мы используем запросы с определенными столбцами. Прелесть формата файла в том, что все данные столбца смежны, поэтому запросы выполняются быстрее.
Из-за оптимизации и популярности формата файлов даже Amazon предоставляет встроенные функции для преобразования входящих потоков данных в файлы Parquet перед сохранением в S3 (который действует как озеро данных). Я широко использовал это с Amazon’s’Athena и некоторыми сервисами Apache. Для получения дополнительной информации о файловой системе Parquet вы можете обратиться к официальной документации.
Зависимости
Прежде чем мы начнем писать код, нам нужно позаботиться о зависимостях. Поскольку это проект Maven Spring Boot, мы перечислим все наши зависимости в файле pom.xml:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.8.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-core</artifactId>
<version>1.2.1</version>
</dependency>
</dependencies>
Как видите, мы добавляем стартовый пакет Spring Boot и несколько других зависимостей Apache. Для этого примера это все, что нам нужно.
Свойства
Как всегда, у нас есть файл application.properties, в котором мы указываем все свойства. В этом примере нам нужны только два свойства: одно задает путь к файлу схемы, а другое указывает путь к выходному каталогу. Подробнее о схеме мы узнаем чуть позже. Итак, файл свойств выглядит так:
schema.filePath=
output.directoryPath=
А поскольку это приложение Spring Boot, мы будем использовать аннотацию @Value для чтения этих значений в коде:
@Value("${schema.filePath}")
private String schemaFilePath;
@Value("${output.directoryPath}")
private String outputDirectoryPath;
Схема паркета
Нам нужно указать схему данных, которые мы собираемся записать в файл Parquet. Это связано с тем, что при создании двоичного файла Parquet тип данных каждого столбца также сохраняется. На основе схемы, которую мы предоставляем в файле схемы, код соответствующим образом отформатирует данные перед их записью в файл Parquet.
В этом примере я делаю все просто, как вы можете видеть из файла схемы ниже:
message m {
required INT64 id;
required binary username;
required boolean active;
}
Позвольте мне объяснить, что это такое. Первый параметр имеет тип INT64, который является целым числом, и называется id. Второе поле имеет двоичный тип, который представляет собой не что иное, как строку. Мы называем это полем имя пользователя. Третье - это логическое поле active. Это довольно простой пример. Но, к сожалению, если в ваших данных есть сотня столбцов, вам придется объявить их все здесь.
Ключевое слово required перед объявлением поля используется для проверки, чтобы убедиться, что для этого поля указано значение. Это необязательно, и вы можете удалить его для полей, которые не являются обязательными.
Паркетный писатель
Время отказа от ответственности, я не писал эти два класса, которые обсуждаю в этом разделе. Несколько месяцев назад, когда я исследовал это, я нашел эти два класса на StackOverFlow. Я не знаю, кто это написал, но я просто везде использовал эти два класса. Но да, я переименовал классы в соответствии с проектом.
Во-первых, класс CustomParquetWriter. Это расширяет класс ParquetWriter, предоставляемый Apache. Код этого класса выглядит следующим образом:
public class CustomParquetWriter extends ParquetWriter<List<String>> {
public CustomParquetWriter(
Path file,
MessageType schema,
boolean enableDictionary,
CompressionCodecName codecName
) throws IOException {
super(file, new CustomWriteSupport(schema), codecName, DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE, enableDictionary, false);
}
}
Здесь не о чем говорить. Следующим идет CustomWriteSupport, который вы можете увидеть как второй параметр конструктора super () в приведенном выше фрагменте. Здесь происходит много всего. Вы можете проверить репо для всего класса и посмотреть, что он делает.
По сути, класс проверяет схему, чтобы определить тип данных каждого поля. После этого с использованием экземпляра класса RecordConsumer данные записываются в файл. Я не буду много говорить об этих двух классах, потому что а) я их не писал и б) код достаточно прост, чтобы любой мог его понять.
Подготовка данных для файла Parquet
Давайте подготовим данные для записи в файлы Parquet. Список строк представляет собой один набор данных для файла Parquet. Каждый элемент в этом списке будет значением корректирующего поля в файле схемы. Например, предположим, что у нас есть следующий список:
Глядя на файл схемы, мы можем сказать, что первое значение в массиве - это идентификатор, второе значение - это имя, а третье значение - это логический флаг для активного поля.
Итак, в нашем коде у нас будет список из String для представления нескольких строк. Да, вы правильно прочитали, это список строк:
List<List<String>> columns = getDataForFile();
Давайте посмотрим на функцию, чтобы увидеть, как мы генерируем данные:
private List<List<String>> getDataForFile() {
List<List<String>> data = new ArrayList<>();
List<String> parquetFileItem1 = new ArrayList<>();
parquetFileItem1.add("1");
parquetFileItem1.add("Name1");
parquetFileItem1.add("true");
List<String> parquetFileItem2 = new ArrayList<>();
parquetFileItem2.add("2");
parquetFileItem2.add("Name2");
parquetFileItem2.add("false");
data.add(parquetFileItem1);
data.add(parquetFileItem2);
return data;
}
Это довольно просто, правда? Тогда пойдем дальше.
Получение файла схемы
Как мы уже говорили, у нас есть файл схемы. Нам нужно включить эту схему в код, в частности, как экземпляр класса MessageType. Посмотрим, как это сделать:
MessageType schema = getSchemaForParquetFile();
...
private MessageType getSchemaForParquetFile() throws IOException {
File resource = new File(schemaFilePath);
String rawSchema = new String(Files.readAllBytes(resource.toPath()));
return MessageTypeParser.parseMessageType(rawSchema);
}
Как видите, мы просто читаем файл как строку, а затем анализируем эту строку с помощью метода parseMessageType () в классе MessageTypeParser, предоставляемом библиотекой Apache.
Получение паркетного писателя
Это почти последний шаг в процессе. Нам просто нужно получить экземпляр класса CustomParquetWriter, который мы обсуждали ранее. Здесь мы также указываем путь к выходному файлу, в который будет записывать писатель. Код для этого также довольно прост:
CustomParquetWriter writer = getParquetWriter(schema);
...
private CustomParquetWriter getParquetWriter(MessageType schema) throws IOException {
String outputFilePath = outputDirectoryPath+ "/" + System.currentTimeMillis() + ".parquet";
File outputParquetFile = new File(outputFilePath);
Path path = new Path(outputParquetFile.toURI().toString());
return new CustomParquetWriter(
path, schema, false, CompressionCodecName.SNAPPY
);
}
Запись данных в файл паркета
Это последний шаг, нам просто нужно записать данные в файл. Мы зациклим список созданного нами списка и запишем каждый список в файл, используя средство записи, созданное на предыдущем шаге:
for (List<String> column : columns) {
writer.write(column);
}
logger.info("Finished writing Parquet file.");
writer.close();
Вот и все. Вы можете перейти в выходной каталог и проверить созданный файл. Например, вот что я получил после запуска этого проекта:
Если вы хотите начать непосредственно с рабочего примера, вы можете найти проект Spring Boot в моем репозитории Github. А если у вас есть сомнения или вопросы, не стесняйтесь спрашивать меня в комментариях.
Первоначально опубликовано на https://blog.contactsunny.com 7 апреля 2020 г.