Источник каталога буферизации Flume не имеет возможности удалять игнорируемые файлы. Удаляет сразу/никогда только обработанные файлы.
Есть три способа получить решение этой проблемы.
Во-первых, вы можете решить проблему явно (с помощью сценария оболочки или любой другой небольшой программы, которая может найти файл, игнорирующий шаблон, и удалить его). На мой взгляд, это не лучший способ сделать это.
Во-вторых, вы можете написать свой собственный источник каталога буферизации с реализацией исходного интерфейса Flume. Это требует больших усилий и сложной задачи для такого рода небольшой проблемы.
В-третьих, оскорбительное решение, вы можете использовать Morphline Interceptor. Перехватчик Morphline упоминается в этой части руководства пользователя Flume. Также вы можете взглянуть на Справочник по морфлайнам
Перехватчики получают событие из источника, выполняют некоторую обработку и, наконец, пересылают его в канал, как вы знаете.
Если вы выберете третье решение, для этого вам нужно использовать kite-sdk. Вы должны добавить зависимость Cloudera Kite Morphlines Core к вашему FLUME_CLASSPATH с помощью flume-env.sh или просто добавьте банку в $APACHE_FLUME_HOME/lib
В этом решении ваш пример конфигурации Flume будет:
a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
a1.sources.src-1.interceptors = morph
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /spool/dir
a1.sources.src-1.fileHeader = true
a1.sources.src-1.ignoredPattern = 'whatever'
a1.sources.src-1.interceptors.morph.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.src-1.interceptors.morph.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.src-1.interceptors.morph.morphlineId = morphline1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = ch-1
a1.sinks.k1.sink.directory = /roll/dir
Затем вы можете создать собственный файл перехватчика morphline как $APACHE_FLUME_HOME/conf/morphline.conf.
В этом файле конфигурации вы можете обрабатывать то, что хотите, просто будьте осторожны, объект записи возвращается дочернему процессу.
Это также не очень хорошее решение, но вы можете написать свой Java-код для выполнения любого процесса во время транзакций Flume. По каждому событию вы можете проверить каталог и, если файл вам не нужен, вы можете его удалить. (Вы должны быть уверены, что пользователь, который запускает процесс Java, имеет разрешения в этом каталоге)
morphlines : [
{
id : morphline1
importCommands : ["org.kitesdk.**"]
commands : [
{
readJson { }
}
{
java {
imports : """
import java.io.File;
import java.io.IOException;
"""
code : """
try {
// This code from my flume agent, you may want to use it, but it is not necessary
// JsonNode rootNode = (JsonNode) record.getFirstValue(Fields.ATTACHMENT_BODY);
// You can traverse in the relevant directory
// and find the ignored pattern manually
// then you can delete it with java code
//Second part of my code
//String rootNodeStr = rootNode.toString();
//record.put("rootNodeStr", rootNodeStr.getBytes(StandardCharsets.UTF_8));
}
} catch (IOException e) {
logger.error("So sad",e);
}
return child.process(record);
"""
}
}
{
setValues {
_attachment_body : "@{rootNodeStr}"
}
}
]
}
]
Я надеюсь, что это будет полезно.
person
csvital
schedule
26.07.2018