Как настроить Apache Flume для удаления файлов, игнорируемых свойством ignorePattern

У меня есть данные, поступающие в spooldir, и я собираю их с помощью Flume и перенаправляю дальше для некоторой обработки.

Есть некоторые файлы, которые не требуются, поэтому я использую свойство igonorePattern в флюме, чтобы их не подбирали.

Но проблема в том, что есть равное количество обязательных и ненужных файлов, которые я получаю, и я не контролирую входящие данные, поэтому я должен принимать все, что я получаю в spooldir.

Поскольку у меня довольно много этих ненужных файлов, у меня нет места на диске для их хранения в течение длительного времени. Таким образом, мне было интересно, есть ли у Flume способ автоматически удалять эти файлы так же, как и для всех файлов .COMPLETED (да, я удаляю файлы, которые подбирает flume)


person Karanjit Singh Tiwana    schedule 25.07.2018    source источник


Ответы (1)


Источник каталога буферизации Flume не имеет возможности удалять игнорируемые файлы. Удаляет сразу/никогда только обработанные файлы.

Есть три способа получить решение этой проблемы.

Во-первых, вы можете решить проблему явно (с помощью сценария оболочки или любой другой небольшой программы, которая может найти файл, игнорирующий шаблон, и удалить его). На мой взгляд, это не лучший способ сделать это.

Во-вторых, вы можете написать свой собственный источник каталога буферизации с реализацией исходного интерфейса Flume. Это требует больших усилий и сложной задачи для такого рода небольшой проблемы.

В-третьих, оскорбительное решение, вы можете использовать Morphline Interceptor. Перехватчик Morphline упоминается в этой части руководства пользователя Flume. Также вы можете взглянуть на Справочник по морфлайнам

Перехватчики получают событие из источника, выполняют некоторую обработку и, наконец, пересылают его в канал, как вы знаете.

Если вы выберете третье решение, для этого вам нужно использовать kite-sdk. Вы должны добавить зависимость Cloudera Kite Morphlines Core к вашему FLUME_CLASSPATH с помощью flume-env.sh или просто добавьте банку в $APACHE_FLUME_HOME/lib

В этом решении ваш пример конфигурации Flume будет:

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
a1.sources.src-1.interceptors = morph

a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /spool/dir
a1.sources.src-1.fileHeader = true
a1.sources.src-1.ignoredPattern = 'whatever'

a1.sources.src-1.interceptors.morph.type = org.apache.flume.sink.solr.morphline.MorphlineInterceptor$Builder
a1.sources.src-1.interceptors.morph.morphlineFile = /etc/flume-ng/conf/morphline.conf
a1.sources.src-1.interceptors.morph.morphlineId = morphline1

a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = ch-1
a1.sinks.k1.sink.directory = /roll/dir

Затем вы можете создать собственный файл перехватчика morphline как $APACHE_FLUME_HOME/conf/morphline.conf.

В этом файле конфигурации вы можете обрабатывать то, что хотите, просто будьте осторожны, объект записи возвращается дочернему процессу.

Это также не очень хорошее решение, но вы можете написать свой Java-код для выполнения любого процесса во время транзакций Flume. По каждому событию вы можете проверить каталог и, если файл вам не нужен, вы можете его удалить. (Вы должны быть уверены, что пользователь, который запускает процесс Java, имеет разрешения в этом каталоге)

morphlines : [
  {
    id : morphline1
    importCommands : ["org.kitesdk.**"]
    commands : [
      {
        readJson { }
      }
      {
        java {
          imports : """
            import java.io.File;
            import java.io.IOException;
          """
          code : """
            try {
                // This code from my flume agent, you may want to use it, but it is not necessary
                // JsonNode rootNode = (JsonNode) record.getFirstValue(Fields.ATTACHMENT_BODY);    

                // You can traverse in the relevant directory
                // and find the ignored pattern manually
                // then you can delete it with java code

                //Second part of my code
                //String rootNodeStr = rootNode.toString();
                //record.put("rootNodeStr", rootNodeStr.getBytes(StandardCharsets.UTF_8));
              }
            } catch (IOException e) {
              logger.error("So sad",e);
            }
            return child.process(record);
          """
        }
      }
      {
        setValues {
          _attachment_body : "@{rootNodeStr}"
        }
      }
    ]
  }
]

Я надеюсь, что это будет полезно.

person csvital    schedule 26.07.2018