Я создаю сквозной поток для потребления данных в HDFS, используя Consume Kafka для файлов Json, полученных через поток событий tealium.
В настоящее время я использовал
Consume Kafka -> Evaluate Json Path -> Jolttransform Json -> Merge Content -> Evaluate Json Path -> Update attribute -> PutHDFS ->MoveHDFS
Требуется прочитать данные JSON для спула за весь день в один файл со ссылкой на атрибут postdate (скрытая эпоха до отметки времени YYYYMMDDSS
раньше) и читать данные ежедневно для объединения в один выходной файл и, наконец, переименовать файл в соответствии с отметкой времени, связанной с POST_DATE. поле для различения ежедневных файлов.
Папка вывода текущей даты должна содержать только файлы обработки текущей даты, а весь завершенный файл вывода для более ранних дат должен перемещаться в другую папку.
Не могли бы вы помочь мне, как работать с MoveHDFS для рекурсивного поиска в папке hdfs и перемещения завершенных выходных файлов, не равных текущей дате, для перемещения в другую папку.