Качество данных должно быть в центре внимания каждого центра разработки данных. «Мусор на входе, мусор на выходе» - это принцип, по которому каждая команда инженеров данных должна жить и стремиться предоставлять данные высочайшего качества для целей аналитики и машинного обучения.

При этом поддерживать качество данных легче сказать, чем сделать. Из-за множества источников, отправляющих данные с переменной скоростью и с переменными форматами, становится невозможным отследить, какой источник принес неверные данные. Разделение плохих записей и сообщение о них владельцам источников имеют первостепенное значение для поддержания высокого качества данных в озерах и на складах.

В этом посте мы увидим, как мы можем использовать Apache NiFi для настройки потока качества данных, который разделяет плохие записи.

Таблица содержания

  1. Настройка на GCP
  2. Создать график качества данных
  3. Понимание потока
  4. Вывод

1. Настройка на GCP

Давайте предоставим инфраструктуру, которая нам понадобится для запуска нашего экземпляра NiFi.

Учетная запись службы

gcloud iam service-accounts create apache-nifi-sa --description="Service Account to run the Apache NiFi" --display-name="Apache NiFi Service Account"

Ведра

Теперь давайте создадим ведра. Мы создадим три корзины: одну для хранения наших артефактов, одну для входных данных и другую для выходных данных.

gsutil mb gs://<PROJECT-ID>-artif
gsutil mb gs://<PROJECT-ID>-input
gsutil mb gs://<PROEJCT-ID>-output

Я

Доступ к сегменту

gsutil iam ch serviceAccount:apache-nifi-sa@<PPROJECT-ID>.iam.gserviceaccount.com:roles/storage.admin gs://<PPROJECT-ID>-artif
gsutil iam ch serviceAccount:apache-nifi-sa@<PPROJECT-ID>.iam.gserviceaccount.com:roles/storage.admin gs://<PPROJECT-ID>-input
gsutil iam ch serviceAccount:apache-nifi-sa@<PPROJECT-ID>.iam.gserviceaccount.com:roles/storage.admin gs://<PPROJECT-ID>-output

Доступ к Dataproc

gcloud projects add-iam-policy-binding <PROJECT-ID> --member='serviceAccount:apache-nifi-sa@<PROEJCT-ID>.iam.gserviceaccount.com' --role='roles/dataproc.editor'

core-site.xml

Этот файл позволит процессору FetchPaqrquet NiFi получать наши файлы данных из корзины на последующих этапах.

<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
   <property>
      <name>fs.gs.impl</name>
      <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem</value>
   </property>
   <property>
      <name>fs.AbstractFileSystem.gs.impl</name>
      <value>com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS</value>
      <description>The AbstractFileSystem for gs: uris.</description>
   </property>
   <property>
      <name>fs.gs.project.id</name>
      <value>PROJECT-ID</value>
      <description>Optional. Google Cloud Project ID with access to GCS buckets.
      Required only for list buckets and create bucket operations.</description>
   </property>
   <property>
      <name>fs.gs.auth.service.account.enable</name>
      <value>true</value>
      <description>Whether to use a service account for GCS authorization. If an email and
      keyfile are provided (see fs.gs.auth.service.account.email and
      fs.gs.auth.service.account.keyfile), then that service account
      will be used. Otherwise the connector will look to see if it is running on
      a GCE VM with some level of GCS access in it's service account scope, and
      use that service account.</description>
   </property>
   <property>
      <name>fs.gs.working.dir</name>
      <value>/</value>
      <description>The directory relative gs: uris resolve in inside of the default bucket.</description>
   </property>
   <property>
      <name>fs.gs.implicit.dir.repair.enable</name>
      <value>true</value>
      <description>Whether or not to create objects for the parent directories of objects
      with / in their path e.g. creating gs://bucket/foo/ upon deleting or
      renaming gs://bucket/foo/bar.</description>
   </property>
   <property>
      <name>fs.gs.glob.flatlist.enable</name>
      <value>true</value>
      <description>Whether or not to prepopulate potential glob matches in a single list
      request to minimize calls to GCS in nested glob cases.</description>
   </property>
   <property>
      <name>fs.gs.copy.with.rewrite.enable</name>
      <value>true</value>
      <description>Whether or not to perform copy operation using Rewrite requests. Allows
      to copy files between different locations and storage classes.</description>
   </property>
</configuration>

Также скачайте jar-файл коннектора GCS Hadoop. Убедитесь, что вы загрузили shared версию.

Загрузите эти два файла core-site.xml и gcs-connector-hadoop2–2.2.0-shaded.jar в корзину для артефактов.

Сценарий запуска

Мы будем использовать этот стартовый скрипт для инициализации нашей виртуальной машины.

#!/bin/bash
NIFI_IMAGE=apache/nifi:1.13.2
CLOUD_SDK_IMAGE=google/cloud-sdk:337.0.0
echo 'Creating artifacts directory'
mkdir -p /etc/nifi/artifs
mkdir -p /etc/nifi/artifs/jars/
echo 'Copying artifacts from bucket'
docker run -v /etc/nifi/artifs:/etc/nifi/artifs $CLOUD_SDK_IMAGE gsutil cp gs://<PROJECT-ID>-artif/core-site.xml /etc/nifi/artifs/
docker run -v /etc/nifi/artifs:/etc/nifi/artifs $CLOUD_SDK_IMAGE gsutil cp gs://<PROJECT-ID>-artif/gcs-connector-hadoop2-2.2.0-shaded.jar /etc/nifi/artifs/jars/
echo 'Starting Apache NiFi container'
docker run -d --name apache-nifi --net host -v /etc/nifi/artifs/:/etc/nifi/artifs -e NIFI_WEB_HTTP_HOST='0.0.0.0' $NIFI_IMAGE
echo 'Altering IPTables'
iptables -A INPUT -p tcp --dport 8080 -j ACCEPT

GCE VM

Затем мы создадим виртуальную машину с помощью сценария запуска.

gcloud beta compute --project=erudite-realm-303906 instances create apache-nifi --zone=us-central1-a --machine-type=n2-standard-4 --subnet=default --network-tier=PREMIUM --metadata=startup-script-url=gs://erudite-realm-303906-artif/startup-script.sh --maintenance-policy=MIGRATE --service-account=apache-nifi-sa@erudite-realm-303906.iam.gserviceaccount.com --scopes=https://www.googleapis.com/auth/cloud-platform --tags=apache-nifi-vm --image=cos-77-12371-1109-0 --image-project=cos-cloud --boot-disk-size=30GB --boot-disk-type=pd-balanced --boot-disk-device-name=apache-nifi --no-shielded-secure-boot --shielded-vtpm --shielded-integrity-monitoring --reservation-affinity=any

Ваша виртуальная машина перейдет в рабочее состояние через 10–15 секунд, но сценарию запуска потребуется некоторое время (5–6 минут) для полного выполнения и запуска экземпляра Apache NiFi.

Кроме того, вы не сможете получить доступ к пользовательскому интерфейсу, поскольку он будет заблокирован брандмауэром. Итак, мы создадим правило брандмауэра, разрешающее доступ к пользовательскому интерфейсу NiFi с общедоступных IP-адресов.

Правило брандмауэра

gcloud compute firewall-rules create allow-apache-nifi-ui --network default --priority 1000 --direction ingress --action allow --target-tags apache-nifi-vm --source-ranges 0.0.0.0/0 --rules tcp:8080 --enable-logging

Теперь вы сможете загрузить веб-интерфейс Apache NiFi по адресу http://<EXTERNAL-IP>:8080/nifi.

2. Создайте график качества данных.

В этом разделе мы настроим потоковый граф с помощью пользовательского интерфейса. У нас будут процессоры, которые будут читать паркетные файлы из корзины GCS, а затем фильтровать записи на NULL в определенном поле.

NiFi перемещает данные между разными процессорами в форме FlowFiles. Документация объясняет FlowFiles следующим образом.

FlowFiles лежат в основе NiFi и его дизайна, основанного на потоках. FlowFile - это запись данных, которая состоит из указателя на ее содержимое (полезные данные) и атрибутов для поддержки содержимого, связанного с одним или несколькими событиями происхождения.

Мы будем использовать образцы данных паркета, которые вы можете скачать из моего репозитория Github.

Я покажу вам полную блок-схему и подробно расскажу о каждом процессоре в следующем разделе.

Этот график использует 5 процессоров.

  1. ListGCSBucket: список файлов паркета, хранящихся во входном сегменте. Подробная информация о каждом файле передается в виде файла потока следующему процессору.
  2. FetchParquet - получение содержимого паркетных файлов из корзины. Каждый потоковый файл содержит содержимое паркетного файла.
  3. QueryRecords - фильтруйте файлы потока в два потока: один со всеми записями, имеющими ip_address как null, а другой без null. Процессор QueryRecord использует запросы ANSI SQL для создания множественных выходных отношений. SQL основан на Apache Calcite.
  4. PutGCSObject - поместите отфильтрованные записи в выходной сегмент (пустые, а не пустые значения помещаются в разные папки).

3. Понимание потока

В этом разделе мы разберемся с конфигурациями каждого процессора.

  1. ListGCSBucket

Конфигурации, выделенные жирным шрифтом, являются обязательными.

Сегмент - сегмент, из которого следует перечислить объекты.

Служба поставщика учетных данных GCP - объект службы, используемый для аутентификации запроса списка в GCP.

Количество повторных попыток - как следует из названия, количество повторных попыток в случае сбоя вызова.

Использовать поколения - если false операция вернет только последнюю версию каждого объекта, в противном случае она вернет каждую версию отдельно.

2. FetchParquet

Имя файла - имя файла для загрузки. Мы используем язык выражений NiFi, чтобы указать имя файла. Процессор ListGCSBucket добавляет атрибуты gcs.bucket и filename к каждому файлу потока.

Record Writer - объект для записи содержания. Мы будем использовать ParquetRecordSetWriter. Мы также можем использовать некоторые другие писатели, такие как Avro, Json, CSV, чтобы назвать несколько.

Ресурсы конфигурации Hadoop - путь к core-site.xml файлу. Мы смонтировали этот файл в нашем контейнере. В этом файле есть параметры конфигурации для нашей корзины GCS.

Дополнительные ресурсы пути к классам - путь к Jar коннектора GCS Hadoop. Этот файл также смонтирован в нашем контейнере.

Перейдите на вкладку Settings и выберите failure и retry, чтобы автоматически разорвать отношения. Мы будем использовать только путь success. Однако вы можете использовать и другие отношения и создать больше ребер в графе.

3. QueryRecord

Читатель записи - объект для чтения записей. Поскольку предыдущий процессор FetchParquet выдает Parquet записей, мы будем использовать ParquetReader.

Record Writer - мы также будем использовать Parquet в качестве формата вывода, поэтому для этого мы будем использовать существующий объект ParquetRecordSetWriter.

Добавьте следующие 2 новых свойства, чтобы создать новые потоки из существующего потока.

Пустые значения - запрос для фильтрации записей с ip_address как null

NotNulls - запрос для фильтрации записей с ip_address как not null

Перейдите на вкладку Settings и выберите failure и original, чтобы автоматически разорвать отношения.

4. PutGCSObject

Соедините отношения Null и NotNull от предыдущего процессора QueryRecord с PutGCSObject процессором.

Служба поставщика учетных данных GCP - объект службы, используемый для аутентификации запроса списка в GCP.

Количество повторных попыток - как следует из названия, количество повторных попыток в случае сбоя вызова.

Сегмент - сегмент вывода, в который будут записываться записи.

Ключ - ключ файла для записи в корзину. Мы будем разделять выходные файлы на префиксы valid и invalid.

Перейдите на вкладку Settings и выберите success и failure, чтобы автоматически разорвать отношения.

После настройки всего графика щелкните правой кнопкой мыши каждый процессор и выберите Start. Это запустит каждый процессор. Вы увидите зеленую кнопку на каждом процессоре, если все настроено правильно.

Примечание. Если вы видите предупреждающий знак на ваших процессорах с указанием Controllers are disabled, щелкните правой кнопкой мыши пустой холст, выберите configure и включите все Controller объекты, которые мы создали во время настройки.

Как только все будет запущено, загрузите образцы файлов паркета во входную корзину. Если все настроено правильно, вы увидите, как данные проходят через график.

Вы можете видеть Out как 5, так как мы загрузили 5 файлов. Вы можете пойти и проверить выходной сегмент, чтобы увидеть папки valid и invalid с файлами в них.

Вы также можете проверить, что в каждой папке есть 5 файлов.

4. Вывод

В этом блоге мы увидели, как легко настроить поток фильтрации качества данных в NiFi. Вы можете использовать этот образец графика в качестве отправной точки и добавить больше взаимосвязей и проверок.

Например, вместо того, чтобы просто писать недействительные записи, вы можете подключить какой-то механизм уведомления и уведомить владельца источника о плохих записях. Вы также можете отправить некоторую статистику в свой стек мониторинга и обновить показатели качества в реальном времени.

Благодаря простому в использовании пользовательскому интерфейсу и постоянно растущему количеству процессоров Sky - это предел для потоковых графиков, которые вы можете создавать с помощью NiFi.

Если вы обнаружите какие-либо проблемы в коде или у вас есть какие-либо вопросы в целом, не стесняйтесь оставлять комментарий.

А до тех пор счастливого кодирования! :)