На ум приходит Apache Nifi, если вы ищете простой, но надежный инструмент для обработки данных из различных источников. Вы хотите поэкспериментировать, но не знаете, с чего начать? Это руководство для вас.

Система, над которой я работаю, вот-вот подвергнется значительному техническому обновлению. Цель состоит в том, чтобы устранить существующие препятствия, мешающие принимать больше данных. Для решения проблемы требуется продукт, который получает входные данные из внешних источников, обрабатывает их и распространяет результаты по назначению.

Apache Nifi реализует парадигму поточного программирования (FBP); он состоит из процессов черного ящика, которые обмениваются данными через предопределенные соединения (отрывок из Википедии).

Короче говоря, Apache NiFi - это инструмент для обработки и распространения данных. Его интуитивно понятный пользовательский интерфейс поддерживает определения маршрутизации, различные разъемы (вход / выход) и множество встроенных процессоров. Все эти функции в совокупности делают его подходящей дополнительной платформой для нашего варианта использования.

Принимая во внимание будущие потребности нашей системы, мы решили тщательно оценить Nifi. Отправной точкой является настройка среды.

В этой статье я опишу, как настроить среду Nifi с использованием образов Docker и запустить простой предопределенный шаблон; Создание потока Nifi с нуля будет рассмотрено в другой статье. Основные три части статьи:

  • Обзор концепций и строительных блоков Apache Nifi
  • Настройка Nifi Flow и Nifi Registry (на основе образов Docker)
  • Загрузка шаблона и его запуск

Готовый? Начнем с основ.

Компоненты и концепции Nifi

Nifi основан на следующей иерархии:

  • Группа процессов
    Набор процессоров и их соединений. Группа процессов - это наименьшая единица, сохраняемая в системе контроля версий (Nifi Registry). Группа процессов может иметь порты ввода и вывода, позволяющие подключать группы процессов. При этом поток данных может состоять более чем из одной группы процессов.
  • Процессор
    Процессор, который (в большинстве случаев) имеет вход и выход, связанный с другим процессором с помощью соединителя . Каждый процессор представляет собой черный ящик, который выполняет одну операцию. ; например, процессоры могут изменять содержимое или атрибуты FlowFile (см. ниже).
  • FlowFile
    Это логический набор данных, состоящий из двух частей (содержимое и атрибуты), который передается между процессорами Nifi. Объект FlowFile неизменяем, но его содержимое и атрибуты могут изменяться во время обработки.
  • Соединение
    Соединение - это очередь, которая направляет FlowFiles между процессорами. Логика маршрутизации основана на условиях, связанных с результатом процессора; соединение связано с одним или несколькими типами результатов. Условия подключения - это отношения между процессорами, которые могут быть статическими или динамическими. Хотя статические отношения фиксированы (например, Успех, Неудача, Соответствие или Несоответствие), динамические отношения основаны на атрибутах FlowFile, определенных пользователем; последний раздел этой статьи иллюстрирует эту функцию с помощью процессора RouteOnAttribute.
  • Порт
    Точки входа и выхода из группы процессов. Каждая группа процессов может иметь один или несколько портов ввода или вывода, различающихся по именам.
  • Последовательность
    объединяет данные из нескольких подключений в одно подключение.

В потоке Nifi ниже изображены эти компоненты:

Изучив компоненты потока данных Nifi, давайте посмотрим, как настроить среду.

Настройка среды Nifi

Вместо того, чтобы устанавливать Nifi на свой компьютер, я решил работать с Nifi, размещенным в контейнерах Docker, в основном по следующим причинам:

  • Переносимость: приложение Nifi можно реплицировать или переместить на другой хост.
  • Гибкость, позволяющая иметь несколько приложений Nifi на одном хосте, каждое с отдельным портом.
  • Малая занимаемая площадь: отсутствие необходимости менять хост-машину.
  • Возможность заморозить среду при создании снимков в процессе определения потоков Nifi.

Запуск приложения Nifi

Я использовал Ubuntu, Linux на основе Debian, поэтому вы можете найти команды для установки и настройки Docker в этой статье.

После установки Docker загрузите последнюю версию образа Nifi с DockerHub.

Вы можете создать и запустить контейнер Docker с настройками по умолчанию:

$ docker run apache/nifi

Однако я хочу создать более сложный контейнер, который позволяет:

  • Назовите мой контейнер (я назвал его nifi2)
  • Управление портом, а не портом 8080 по умолчанию
  • Возможность видеть загрузку и стандартный вывод Nifi
  • Создайте общий том между моим хостом (shared-directory) и контейнером (ls-target)

В приведенном ниже примере команды достигается указанное выше (имя контейнера - nifi2):

$ docker run --name nifi2 -p 8091:8080 -i -v ~/document/Nifi/shared-directory:/opt/nifi/nifi-current/ls-target apache/nifi

Ура! ваше приложение Nifi запущено и доступно через порт 8091.

Далее настраиваем инструмент контроля версий - Nifi Registry.

Запуск реестра Nifi

Nifi Registry - это отдельный подпроект Nifi, который позволяет контролировать версии потоков Nifi. Он позволяет сохранять состояние потока, разделять потоки между различными приложениями Nifi, включать откаты и другие функции контроля версий.

Хотя вы можете сделать резервную копию файла flowfile.xml.gz, который содержит всю информацию о группах процессов, правильным способом управления версиями является использование Nifi Registry; его главное преимущество - простота сохранения изменений, инкапсулированных в специальный файл flow.json для каждой группы процессов.

Образ реестра Nifi доступен в Docker Hub; запустите команду docker pull, чтобы установить его.

Команда Docker ниже определяет и запускает контейнер реестра Nifi. Я решил не использовать параметр -d (отсоединить), как это предлагается в DockerHub, поэтому ход выполнения и вывод контейнера будут видны во время его работы. В отличие от контейнера Nifi, на этот раз я решил оставить порт по умолчанию, поскольку я не буду использовать несколько экземпляров реестра Nifi.

$ docker run --name nifi-registry -p 18080:18080 apache/nifi-registry

Подключение приложения Nifi к контролю версий

Как правило, мы можем подключить приложение Nifi к одному или нескольким реестрам. Это обеспечивает гибкость при работе с более чем одной средой.

Чтобы подключить наше приложение Nifi (http: // localhost: 8091 / nifi) к реестру, откройте настройки Nifi → Клиент реестра → [+].

Далее нам понадобится адрес реестра. Поскольку приложение Nifi не запущено на хост-машине, попытка доступа к реестру Nifi с адресом http: // localhost: 18080 / nifi-registry не сработает. Наше приложение Nifi запускается из контейнера, поэтому ему нужен IP-адрес внешнего хоста. IP-адрес хост-машины можно получить, проверив контейнер Nifi.

$ docker inspect nifi2

IP-адрес хоста - это шлюз:

Поскольку вывод этой команды обширен, приведенная ниже команда получит только IP-адрес шлюза:

$ docker inspect nifi2 --format='{{.NetworkSettings.Networks.bridge.Gateway}}'

После этого определение реестра Nifi просто. На изображении ниже показано приложение Nifi, подключенное к двум экземплярам Nifi Registry.

Наконец, после настройки хотя бы одного реестра появляется новая опция меню для сохранения группы процессов в системе контроля версий.

Настройка сегмента - это следующий и последний шаг в настройке реестра Nifi. Корзина - это контейнер одной или нескольких версий групп процессов; в реестре Nifi должна быть хотя бы одна корзина для хранения в ней потоков Nifi.

Давайте создадим корзину и назовем ее Flow-Development. На изображении ниже я создал две корзины: одну для разработки, а другую - для постановки:

Хотя наш холст Nifi пуст и нет группы процессов для сохранения, основы есть. После того, как мы создадим группу процессов, ее можно будет сохранить в реестр.

Теперь мы готовы двигаться дальше.

Изучение контейнера Nifi

Как только контейнер Nifi запущен, мы можем запустить commanddocker exec, чтобы войти в контейнер и изучить его; следующая команда запускает bash в контейнере (с использованием параметров -i для интерактивности и -t для терминала):

$ docker exec -i -t nifi2 /bin/bash

Перемещаясь по каталогам контейнера, вы можете перейти к каталогу thenifi/conf, в котором находится файл flow.xml.gz. Этот файл содержит всю информацию о полотне пользовательского интерфейса Nifi, включая все группы процессов; любое изменение сохраняется автоматически.

Другой важный файл - conf/nifi.properties. Он содержит конфигурацию Nifi, включая расположение flow.xml.gz. Подробнее о конфигурационном файле по этой ссылке.

Репозитории Nifi

У Nifi есть три основных репозитория:

  1. Репозиторий FlowFile: хранит метаданные FlowFiles во время активного потока.
  2. Репозиторий содержимого: хранит фактическое содержимое FlowFiles.
  3. Репозиторий Provenance: хранит снимки файлов FlowFiles в каждом процессоре. При этом он описывает подробный поток данных и изменения в каждом процессоре и позволяет детально обнаруживать цепочку событий.

После входа в контейнер Nifi эти репозитории показывают:

Импорт потока Nifi

Итак, после установки и запуска приложения Nifi и подключения его к Nifi Registry, вы готовы запустить поток данных.

Вместо того, чтобы создавать поток Nifi с нуля, давайте загрузим существующий. Есть два варианта загрузки Flow в Nifi:

  • Импортировать шаблон в Nifi Flow (файл XML)
  • Импортируйте определение потока (файл JSON) в Nifi Registry, а затем создайте из него группу процессов.

Первый прост, в то время как второй немного сложнее, но его можно легко сделать с помощью инструментария Nifi, который не рассматривается в этой статье.

Вы можете скачать образец шаблона с GitHub, а затем загрузить его в свое приложение Nifi:

Понимание потока шаблона

Этот шаблон включает пять шагов, начиная с создания файла со случайным текстом, его переименования, извлечения свойств и направления результатов в каталоги на основе извлеченных свойств (подробности отображены в репозитории GitHub).

После завершения обработки файлы сохраняются в локальном каталоге самого контейнера. Возврат к команде контейнера run показывает, что он включает определение общего тома (ls-target). Этот том является местом назначения обрабатываемых файлов, и поэтому к нему можно получить доступ с хоста:

$ docker run --name nifi2 -p 8091:8080 -i -v ~/document/Nifi/shared-folder:/opt/nifi/nifi-current/ls-target apache/nifi

Устранение неполадок: у вас может возникнуть проблема с перетаскиванием файлов в каталог, если разрешения не были предоставлены: (сообщение об ошибке: «разрешение отклонено»):

Чтобы устранить эту проблему, предоставьте разрешения для каталога (команда chmod).

К этому каталогу также можно получить доступ из контейнера:

Откройте переменные группы процессов и увидите, что целевая папка определена там:

Погружение в процессоры Nifi

Вы можете запустить все процессоры одновременно, щелкнув правой кнопкой мыши на холсте (не на конкретном процессоре) и нажав кнопку Пуск. Через некоторое время файлы должны попасть в папки назначения.

Каждый сгенерированный файл подвергается процессу изменения своего имени и маршрутизации в зависимости от его содержимого; Я предпочел сделать это простым и не менять содержимое файлов. Тем не менее, этот шаблон иллюстрирует некоторые особенности процессоров Nifi:

  • Извлечение атрибутов на основе содержимого:

  • Пример языка выражения Nifi:

  • Логика маршрута на основе атрибутов:

В шаблоне используются следующие процессоры:

  • GenerateFlowFile: генерирует FlowFile, полезные для целей тестирования. Это позволяет контролировать частоту, количество и размер каждого FlowFile.
  • ExtractText: извлекает текст из содержимого FlowFile.
  • UpdateAttribue: обновляет атрибуты FlowFile.
  • RouteOnAttribute: создает динамические отношения и определяет логику маршрутизации на основе значения атрибута.
  • PutFile: сохраняет содержимое FlowFile в каталог. В этом примере процессор создает целевой каталог, если он не существует, и устанавливает разрешения для файлов.

Демонстрация функции противодавления

После запуска процесса вы можете остановить процессор Обновить имя файла и посмотреть, что произойдет. Вскоре коннектор, который передает FlowFiles в Обновить имя файла, достигнет предела и подавит поток файлов в предыдущий процессор в потоке.

Это поведение является демонстрацией определения обратного давления; соединитель ограничен количеством FlowFiles или их общим размером. Если этот механизм срабатывает, это указывает на проблему. Это триггер для начала расследования того, что пошло не так. Это может быть проблема, например, неисправность процессора, или указание на то, что основная нагрузка увеличилась, и, следовательно, поток должен быть настроен или распределен.

Сохранение группы процессов в реестр Nifi

Запустив шаблон и немного изменив его, вы хотите его сохранить. Помните Nifi Registry?

Выберите «Начать управление версиями» и сохраните группу процессов в ранее созданном сегменте:

Nifi указывает, зафиксировано ли последнее состояние группы процессов в системе контроля версий или нет: зеленая галочка означает, что оно было сохранено в реестре. Если есть изменения, которые не были зафиксированы в реестре, они помечаются темной звездочкой.

Некоторые рекомендации перед закрытием

  • Рекомендуется использовать группы процессов, поскольку это минимальная единица для сохранения в реестре Nifi.
  • Используйте комментарии, метки и цвета для документирования и организации потока Nifi.
  • Nifi имеет обширные возможности автоматизации (Nifi REST API, Nifi Toolkit), которые не рассматривались в этой статье. Nifi Toolkit может автоматизировать процессы с помощью инструмента командной строки; Помимо более эффективной работы, преимущества такой автоматизации значительны для создания конвейера CI / CD.

Что дальше?

Если вы дошли до этого места, вы получили рабочую платформу для реализации вашего приложения; в этом шаблоне используется только несколько базовых процессоров, но он может стать трамплином для продолжения построения более сложного потока, использующего впечатляющее разнообразие процессоров Nifi.

Эта статья затрагивает только верхушку айсберга; необходимо охватить множество аспектов и функций, таких как происхождение данных, ведение журнала, переменные и параметры, службы Nifi, вывод данных конвергентных процессоров с использованием воронок и многое другое. Вспомогательные инструменты для автоматизации - тоже огромная область, которую нужно охватить.

Я оставлю эти темы для следующих статей.

Продолжайте строить!

- Лиор

Использованная литература: