Подсказка: многопоточность. Используя PDI и PostgreSQL, мы можем загрузить огромный файл Excel за несколько минут.

Все больше и больше компаний начинают осознавать важность данных. Следовательно, они приходят с запросами на загрузку огромных файлов CSV или Excel из своих устаревших систем или ручных процессов в базу данных для аналитики на основе данных. Я знаю, что теперь у нас есть много решений для решения этой проблемы, такие как pandas, dask, vaex библиотеки Python или инструменты, такие как Informatica и т. Д.

Однако всегда интересно изучить разные подходы к решению задачи. Мы будем использовать PDI для решения этой задачи и использовать PostgreSQL в качестве нашей базы данных. Идея здесь заключается в том, чтобы оптимально использовать возможности нашей системы. Я знаю, что не все из нас имеют в своем распоряжении серверы с оптимизацией памяти.

Если вы новичок в процессе построения конвейера данных, я порекомендую вам ознакомиться с приведенной ниже историей.



Предпосылка

  • PDI: в вашей системе установлен Pentaho Data Integration. Вы можете воспользоваться ссылкой для получения пошаговой инструкции по установке.
  • PostgreSQL: мы можем использовать любую реляционную или нереляционную базу данных по своему усмотрению. Если вы хотите продолжить, воспользуйтесь ссылкой, чтобы установить то же самое.

Истории пользователей

Мне нравится сначала определять пользовательские истории для нашей постановки проблемы. Это помогает мне спроектировать высокоуровневую архитектуру конвейера данных. Нам нужно всегда разбивать проблему на небольшие, простые отдельные части.

  1. Я хочу прочитать огромный CSV-файл с экономическими данными, содержащими миллионы записей.
  2. Я хочу выполнить поиск каждого содержимого в данных с помощью таблицы измерения / главной.
  3. Я хочу очистить данные и удалить поля NULL.
  4. Я хочу добавить условие на уровне строки. Я хочу добавить «_R», если в столбце статуса есть слово «Исправлено».
  5. Я хочу загрузить то же самое в базу данных PostgreSQL.

Мы пытаемся воспроизвести реальный сценарий, добавив немного сложности манипулированию данными.

Входные данные

Хорошая практика - разбираться в файлах входных данных. Теперь, в нашем случае, может быть сложно открыть огромные файлы CSV и проверить столбцы и строки. Однако есть методы, с помощью которых мы можем определить или проверить данные выборки. PDI позволяет читать образцы данных и проверять другие метаданные, создав небольшое преобразование.

Здесь я ввел в Google термин огромный файл данных в формате csv и загрузил файл с первого веб-сайта. Вот ссылка.

Теперь я хотел разбить систему и создать огромный файл; как будто мы имеем дело с большими данными, не так ли? В загруженном файле было 66 526 записей, поэтому я добавил одни и те же записи несколько раз, чтобы создать огромный файл примерно с 11 974 697 записями; да, не такой уж большой.

Тестовые кейсы

Здесь важно определить тестовые примеры, поскольку мы не можем вручную проверить все данные. Нам нужно убедиться, что мы проверяем достаточно хорошие образцы данных для перекрестной проверки точности.

  1. Проверьте количество строк и соответствие входным данным. Обратите внимание, так как мы будем удалять данные NULL. Также важно хранить эти NULL-записи.
  2. Перекрестная проверка результатов вывода измерений по крайней мере для 500–1000 записей.
  3. Перекрестная проверка расчетов случайным образом для проверки точности; опять по крайней мере для тысячи записей.

Шаг 1: Подготовка к проекту

У нас есть довольно простой проект для этого проекта. Всего один каталог и один файл преобразования.

Я предпочитаю создавать все связанные с работой проекты в одном каталоге проектов под названием «Работа»; Я знаю, как творчески! Нам необходимо выполнить следующее: вы можете пропустить этот шаг.

  1. Создайте каталог нашего проекта - LoadData.
  2. Создайте каталог «Вход» в каталоге проекта.
  3. Создайте пустое преобразование с именем «Main.ktr» в каталоге проекта.

Если вам неизвестны такие слова, как «трансформации» или «работа», я порекомендую вам рассказ, упомянутый ниже.



Шаг 2: Создание таблицы базы данных

Я предполагаю, что у вас уже установлена ​​база данных. Мы используем PostgreSQL.

Сейчас я предпочитаю создавать таблицы с помощью Django Models. Вам не обязательно использовать эту методологию.

Сказав это, это упрощает нашу жизнь, создавая модели Django вместо того, чтобы вручную создавать таблицы и столбцы изначально. Модели Django делают это за нас с помощью простой команды миграции, а также получают функциональность CRUD (создание, чтение, обновление и удаление) из коробки.

Вы можете выбрать два указанных ниже варианта для создания базы данных и таблицы. Я создал таблицу medium_db

  • Скрипт создания PostgreSQL.
 — Table: public.economic_data
 — DROP TABLE public.economic_data;
CREATE TABLE public.economic_data
(
id integer NOT NULL DEFAULT nextval(‘economic_data_id_seq’::regclass),series_reference character varying(255) COLLATE pg_catalog.”default” NOT NULL,
indicator_name character varying(255) COLLATE pg_catalog.”default” NOT NULL,
period character varying(45) COLLATE pg_catalog.”default” NOT NULL,
indicator_value numeric(30,10) NOT NULL,
status character varying(255) COLLATE pg_catalog.”default” NOT NULL,
indicator_unit character varying(255) COLLATE pg_catalog.”default” NOT NULL,
group_name character varying(255) COLLATE pg_catalog.”default” NOT NULL,
series_name character varying(255) COLLATE pg_catalog.”default”,
 CONSTRAINT economic_data_pkey PRIMARY KEY (id)
)
TABLESPACE pg_default;
ALTER TABLE public.economic_data
OWNER to YOURUSER;
  • Скрипт модели Django для запуска миграции.
from django.db import models
# Create your models here.
class EconomicData(models.Model):
series_reference = models.CharField(
db_column="series_reference",
max_length=255,
help_text="Unique code to identify a particular record",
verbose_name="Series Reference",
)
indicator_name = models.CharField(
db_column="indicator_name",
max_length=255,
verbose_name="Name of the indicators"
)
period = models.CharField(
db_column="period",
max_length=45,
verbose_name="Period"
)
indicator_value = models.DecimalField(
db_column="indicator_value",
max_digits=30,
decimal_places=10,
verbose_name="Value of the Field"
)
status = models.CharField(
db_column="status",
max_length=255,
verbose_name="Status of the value For eg, Final or Revised"
)
indicator_unit = models.CharField(
db_column="indicator_unit",
max_length=255,
verbose_name="Unit of the indicators"
)
group_name = models.CharField(
db_column="group_name",
max_length=255,
verbose_name="Group of the indicators"
)
series_name = models.CharField(
db_column="series_name",
max_length=255,
verbose_name="Series of the indicators"
null=True
)
def __str__(self):
return f"{self.indicator_name} - {self.value}"
class Meta:
db_table = "economic_data"
verbose_name = "Economic Data"
verbose_name_plural = "Economic Data"

Если вы хотите понять, как мы можем извлечь выгоду из Django в нашем конвейере данных, сообщите мне об этом в разделе ответов ниже.

У нас есть готовый стол. Теперь давайте создадим нашу трансформацию.

Шаг 3: преобразование загрузчика

Нам нужно создать преобразование загрузчика, которое считывает наш входной CSV, выполняет манипуляции и загружает данные в базу данных.

Нам нужно открыть наш файл Main.ktr и перетащить какой-нибудь плагин, как указано ниже.

Шаг 1. Перетащите шаги

  1. Во-первых, давайте добавим небольшое описание преобразования. Документация - это ключ к любому конвейеру данных.
  2. Перетащите плагины 'CSV file input', 'Data grid', 'Join rows (cartesian product)', 'User defined Java expression', 'Filter rows', 'Text file output', 'Table Output' с вкладки дизайна на холст.
  3. Переименуйте поля в соответствии с нашим соглашением об именах.

Шаг 2: настройка свойств

  1. Нам нужно настроить свойства для каждого из вышеупомянутых шагов. Позвольте настроить для нашего шага ввода CSV, нам нужно найти наш входной файл в поле Имя файла и нажать Получить поля. Мы можем настроить размер буфера NIO в соответствии с доступностью нашей памяти; он будет обрабатывать файлы пакетами по 50К записей в каждом.
  2. Нам нужно добавить данные в Data Grid (здесь репликация таблицы). Здесь мы используем сетку данных, например данные. В реальном сценарии вы получите эти данные из некоторой таблицы измерений. Мы стандартизируем названия групп. Вы можете обратиться к приведенному ниже снимку экрана для получения данных. Нам нужно добавить имена столбцов на вкладке Мета и фактические данные на вкладке Данные .
  3. На этапе «Объединить строки» нам нужно сопоставить поля, которые мы хотим получить от ввода, с нашей таблицей / сеткой измерений. Поскольку здесь мы отображаем группы, нам нужно добавить Условие, чтобы сопоставить то же самое.
  4. В пользовательском выражении Java мы настроим пользовательское условие на уровне строки. Нам нужно определить наше Новое поле как series_reference_flag, здесь мы хотим изменить поле Series_reference и добавить _R, если столбец статуса Revised. В наше выражение Java мы добавим следующее условие - status ==« Revised ? Series_reference + _R: Series_reference ’; это Java-код. Мы можем выполнить похожие условия или расчеты, Мощно! Наконец, нам нужно добавить Тип значения к "Строке".
  5. На шаге Фильтр строк нам нужно определить наше условие передачи записей без нулевых значений.
  6. В выводе текстового файла (отчет об ошибках) нам нужно добавить Имя файла как ' $ {Internal.Entry.Current.Directory} / error_report ' и измените Расширение на CSV.
  7. На шаге вывода таблицы нам нужно создать новое соединение для подключения к нашей базе данных. Мы можем подключиться к любой базе данных в соответствии с нашими требованиями. Здесь мы подключимся к PostgreSQL; подробности подключения см. на снимке экрана. Нам нужно перейти от целевой таблицы к «экономическим_данным». Нам нужно проверить поле Указать поля базы данных. Затем нам нужно сопоставить поля ввода / преобразования с полями таблицы.

Шаг 4. Давайте ускорим процесс

Теперь, когда мы настроили свойства, мы можем ускорить процесс, создав несколько потоков для вставки данных. Это повысит производительность. PDI предоставляет нам возможность настраивать многопоточность по шагам. Если мы используем его на шаге ввода, он умножит записи. Принимая во внимание, что, если мы используем его для шагов вывода, таких как база данных, он будет распространять записи.

PDI предоставляет нам множество возможностей для оптимизации производительности. Мы можем выполнить следующие шаги, чтобы повысить производительность.

  1. Измените размер буфера NIO на этапе ввода, определите размер пакета.
  2. Измените Макс. размер кеша (в строках) на этапе поиска определите количество строк, которые он будет хранить в кэш-памяти вместо запроса к базе данных.
  3. Измените Размер фиксации, аналогично размеру буфера, измените размер пакета для вставки записей.
  4. Используйте несколько потоков для выполнения действия, мы можем добавить фиктивный шаг и щелкнуть правой кнопкой мыши, чтобы выбрать Изменить количество копий на начало с 1 на любое значение, превышающее 1, в соответствии с нашими требованиями перед нашим шагом вывода.
  5. Обратите внимание: если мы хотим выполнить несколько потоков на шаге вывода таблицы, мы не можем использовать точку номер четыре. Затем нам нужно будет добавить фиктивный шаг перед выводом и распределить записи по нескольким шагам таблицы вывода.

Вау! у нас так много вариантов, стоит ли менять все оптимизатор производительности? Короткий ответ - НЕТ. Нам нужно попробовать с выборочными данными и выполнить несколько тестов, чтобы выбрать то, что лучше всего подходит для нас.

Шаг 2: оценка

Давайте запустим поток без оптимизатора производительности, а затем сравним его, применив оптимизатор.

Мы сократили время, затрачиваемое на выполнение того же действия почти на 50%, за счет добавления простого оптимизатора производительности.

Если мы хотим прочитать несколько файлов и создать цикл для их обработки, я порекомендую вам ознакомиться с приведенной ниже историей.



Вывод

Мы взяли постановку проблемы и попытались решить ее, используя несколько подходов, а также попытались ее оптимизировать. Теоретически вы можете применить этот процесс в соответствии со своими требованиями и попытаться его оптимизировать. PDI также предоставляет нам шаг массовой загрузки PostgreSQL; Я тоже пробовал этот шаг. Однако не было никакого значительного повышения производительности, предоставленного им.

Мы не можем оптимизировать код / ​​конвейер вначале, и нам придется выполнить несколько тестов, чтобы получить идеальные результаты. Однако, чтобы сократить кривую обучения, вы всегда можете прочитать мой опыт и найти решения проблемы, подписавшись на мой список адресов электронной почты, используя приведенную ниже ссылку.



Увидимся в следующем посте. Счастливый ETL