Форматирование сериализатора Apache Flume HDFS

Я только начинаю работать с Flume, и мне нужно вставить некоторые заголовки в приемник hdfs.

У меня это работает, хотя формат неправильный, и я не могу управлять столбцами.

Используя эту конфигурацию:

a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = syslogudp
a1.sources.r1.host = 0.0.0.0
a1.sources.r1.port = 44444

a1.sources.r1.interceptors = i1 i2
a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
a1.sources.r1.interceptors.i1.preserveExisting = false
a1.sources.r1.interceptors.i1.hostHeader = hostname

a1.sources.r1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
a1.sources.r1.interceptors.i2.preserveExisting = false

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://localhost:9000/user/vagrant/syslog/%y-%m-%d/
a1.sinks.k1.hdfs.rollInterval = 120
a1.sinks.k1.hdfs.rollCount = 100
a1.sinks.k1.hdfs.rollSize = 0
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

a1.sinks.k1.serializer = header_and_text
a1.sinks.k1.serializer.columns = timestamp hostname
a1.sinks.k1.serializer.format = CSV
a1.sinks.k1.serializer.appendNewline = true

a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Журналы, записанные в HDFS, в основном в порядке, за исключением сериализованных аспектов:

{timestamp=1415574695138, Severity=6, host=PolkaSpots, Facility=3, hostname=127.0.1.1} hostapd: wlan0-1: STA xx WPA: group key handshake completed (RSN)

Как я могу отформатировать журналы, чтобы они выглядели так:

1415574695138 127.0.1.1 hostapd: wlan0-1: STA xx WPA: group key handshake completed (RSN)

Сначала метка времени, затем имя хоста, а затем тело сообщения системного журнала.


person simonmorley    schedule 09.11.2014    source источник


Ответы (1)


Причина этого в том, что два настроенных вами перехватчика записывают значения в заголовки событий Flume, которые сериализуются в тело с помощью HeaderAndBodyTextEventSerializer. Последний просто делает это:

public void write(Event e) throws IOException {
    out.write((e.getHeaders() + " ").getBytes());
    out.write(e.getBody());
    if (appendNewline) {
      out.write('\n');
    }
  }

делегирование e.getHeaders() приведет к сериализации карты только в строку JSON.

Чтобы решить эту проблему, я бы предложил создать собственный сериализатор и перегрузить метод write() для форматирования вывода в значения, разделенные табуляцией. В этом случае вам просто нужно указать путь к вашему классу в:

a1.sinks.k1.serializer = com.mycompany.MySerlizer

и поместите банку в путь класса Flume.

person Erik Schmiegelow    schedule 15.02.2015
comment
Спасибо за ответ, я уже ничего не ждал. На данный момент отвлеклись от этого, но вскоре постараемся взглянуть на ваше решение. Спасибо. - person simonmorley; 16.02.2015