Как обрабатывать несколько входов с помощью Logstash в одном файле?

Допустим, у вас есть 3 разные строки в файле журнала брандмауэра, и вы хотите:

чтобы найти его, и результат будет сохранен в кластере эластичного поиска с использованием специального вывода эластичного поиска.

что мне делать в моем logstash.conf ??

Спасибо.


person Morito    schedule 14.04.2015    source источник


Ответы (2)


Предполагая, что разные журналы поступают из одного и того же источника журналов (то есть из одного файла) и должны рассматриваться как имеющие один и тот же тип (что является суждением), вы можете просто перечислить несколько шаблонов Grok:

filter {
  grok {
    match => ["message", "pattern1", "pattern2", ..., "patternN"]
  }
}

Перечисленные шаблоны будут пробовать по порядку.

Если сообщения журнала поступают с разных входов и совершенно разные, используйте поле типа, чтобы различать разные сообщения:

filter {
  if [type] == "foolog" {
    grok {
       match => ["message", "pattern1"]
    }
  } else if [type] == "barlog" {
    grok {
       match => ["message", "pattern2"]
    }
  }
}

Этот шаблон может быть подходящим также для сообщений, поступающих из одного и того же ввода, но это требует немного больше работы, поскольку вам сначала нужно изучить сообщение в условном выражении, чтобы определить, какой тип выбрать.

person Magnus Bäck    schedule 14.04.2015
comment
Вы действительно очень любезны, потому что всегда отвечаете мне, но на этот раз это не сработает, если вы позволите, я скажу вам, что именно хочу, и попытаюсь дать мне идею, моя проблема в том, что у меня есть файл журнала с очень 3 разных строки, это пример: - person Morito; 14.04.2015
comment
27 февраля 17:38:51 id = firewall time = 27.02.2015 17:38:51 fw = GVGM-LYON tz = + 0200 startime = 27.02.2015 17:38:50 pri = 5 confid = 01 slotlevel = 2 ruleid = 65 srcif = Vlan2 srcifname = ADISTA ipproto = icmp dstif = Vlan2 dstifname = ADISTA icmptype = 8 icmpcode = 0 proto = icmp src = 10.16.82.50 dst = 10.38.252.131 action = block logtype = filter # 015 - person Morito; 14.04.2015
comment
27 февраля 13:17:05 id = firewall time = 27.02.2015 13:17:04 fw = GVGM-LYON tz = + 0200 startime = 27.02.2015 13:17:03 pri = 5 proto = http contentpolicy = 1 ruleid = 3 op = GET result = 302 user = src = 192.168.21.29 srcport = 49575 srcportname = ephemeral_fw_tcp dst = 217.163.21.35 dstport = 80 dstportname = http dstname = ads.yahoo.com modsrc = 94.127.8.15 mods19rcport = 6 origdst = 127.0.0.2 origdstport = 8080 отправлено = 415 rcvd = 0 duration = 0.25 action = pass cat_site = ads arg = / cms / v1% 3Fesig% 3D1% 7E53b2e94cb9cad249de625ca979a6bba56a9267cf% 26nwid% 3D logtype = 3D logtype% 261000095 - person Morito; 14.04.2015
comment
Если вы заметили, что они разные, я получил только первую строку, и если я добавлю вторую, это не сработает, я много пробовал, но, как вы знаете, я новичок и у меня мало идей. Любая помощь, пожалуйста :( - person Morito; 14.04.2015

Глядя на ваши комментарии под постом Магнуса, я могу поделиться более конкретным примером.

Вариант 1) Журналы нашего Fortigate похожи, а соответствующий фильтр Grok выглядит следующим образом:

    grok {
        match => [
            "message" , "%{FORTIGATE_50_BASE} %{FORTIGATE_50_V1}",
            "message" , "%{FORTIGATE_50_BASE} %{FORTIGATE_50_V2}",
            "message" , "%{FORTIGATE_50_BASE} %{FORTIGATE_50_V3}",
            "message" , "%{FORTIGATE_50_BASE}"
        ]
        tag_on_failure => [ "failure_grok_fortigate" ]
        break_on_match => false
    }

И связанные с этим закономерности таковы:

   FORTIGATE_50_BASE %{SYSLOG5424PRI:syslog_index}date=%{FORTIDATE:date} time=%{TIME:time} devname=%{HOST:hostname} devid=%{HOST:devid} logid=%{NUMBER:logid} type=%{WORD:fortigate_type} subtype=%{WORD:subtype} level=%{WORD:loglevel} vd=\"?%{WORD:vdom}\"?
   FORTIGATE_50_V1 srcip=%{IP:srcip} srcintf=\"%{HOST:srcintf}\" dstip=%{IP:dstip} dstintf=\"%{HOST:dstintf}\" sessionid=%{NUMBER:sessionid} status=%{DATA:status} policyid=%{DATA:policyid} dstcountry=\"%{DATA:dstcountry}\" srccountry=\"%{DATA:dstcountry}\" trandisp=%{WORD:trandisp} service=%{WORD:service} proto=%{INT:proto} app=%{WORD:app} duration=%{INT:duration} sentbyte=%{INT:sentbyte} rcvdbyte=%{INT:rcvdbyte} sentpkt=%{INT:sentpkt} rcvdpkt=%{INT:rcvdpkt}
   FORTIGATE_50_V2 user=\"%{PROG:user}\" ui=%{GREEDYDATA:ui} msg=\"%{GREEDYDATA:msg}\"
   FORTIGATE_50_V3 action=\"%{PROG:action}\" tunneltype=\"%{PROG:tunneltype}\" tunnel_id=%{NUMBER:tunnel_id} remote_ip=(%{IP:remote_ip}|\(null\)) tunnel_ip=(%{IP:tunnel_ip}|\(null\)) user=\"%{PROG:user}\" group=\"%{PROG:group}\" dst_host=\"%{PROG:dst_host}\" reason=\"%{PROG:reason}\" msg=\"%{GREEDYDATA:msg}\"

Таким образом, существует базовый шаблон, общий для всех журналов и некоторых конкретных частей, и способ настройки фильтра grok заключается в том, чтобы сначала обрабатывать более конкретные шаблоны и возвращаться к общему.

Вариант 2) Журналы очень похожи на списки значений ключей, поэтому, возможно, более простой kvfilter поможет вам гораздо больше, чем любой шаблон.

В вашем случае это может быть так просто:

input {
 ...
}
filter {
  kv { }
  date { ... pull in the date data ... }
}
output {
  ..
}

Надеюсь, что это сработает. В общем, мне также было очень полезно иметь unittests теперь доступен формат журнала.

Ваше здоровье

person pagid    schedule 14.04.2015
comment
К сожалению, фильтр kv не будет работать полностью из коробки, поскольку в них есть значения с пробелами. Эти значения заключены в двойные кавычки, но фильтр kv не поддерживает кавычки. - person Magnus Bäck; 15.04.2015