Apache Nifi для перемещения файлов в новую папку hdfs для файлов младше текущей даты

Я создаю сквозной поток для потребления данных в HDFS, используя Consume Kafka для файлов Json, полученных через поток событий tealium.

В настоящее время я использовал

Consume Kafka -> Evaluate Json Path -> Jolttransform Json -> Merge Content -> Evaluate Json Path -> Update attribute -> PutHDFS ->MoveHDFS

Требуется прочитать данные JSON для спула за весь день в один файл со ссылкой на атрибут postdate (скрытая эпоха до отметки времени YYYYMMDDSS раньше) и читать данные ежедневно для объединения в один выходной файл и, наконец, переименовать файл в соответствии с отметкой времени, связанной с POST_DATE. поле для различения ежедневных файлов.

Папка вывода текущей даты должна содержать только файлы обработки текущей даты, а весь завершенный файл вывода для более ранних дат должен перемещаться в другую папку.

Не могли бы вы помочь мне, как работать с MoveHDFS для рекурсивного поиска в папке hdfs и перемещения завершенных выходных файлов, не равных текущей дате, для перемещения в другую папку.


person Deepak    schedule 11.10.2019    source источник


Ответы (1)


Текущий поток работал нормально. Использовать Kafka -> Оценить путь Json -> Jolttransform Json -> Объединить содержимое -> Оценить путь Json -> Обновить атрибут -> PutHDFS ---> Чтобы создать файл слияния.

Создайте отдельный поток, как только вышеуказанный поток будет выполнен, чтобы получить обработанный файл слияния и повторно обработать его с помощью listhdfs-> fethchdfs-> updateattribute-> puthdfs

В listhdfs установите минимальное время ожидания файла до использования. Это позволит процессу рекурсивно искать файлы, а также использовать атрибут обновления для воссоздания папки в соответствии с родительской папкой для повторного использования файла процесса.

person Deepak    schedule 24.10.2019