Firewall-Logs mit Logstash und Kibana visualisieren

Erste Erfahrungen mit dem ELK-Stack

Da ich mit dem Thema Log­da­tei­en-Ana­ly­se zu tun hatte, habe ich das als Anlaß ge­nom­men, mir mal den ELK-Stack (Elas­tic­se­arch, Logs­tash, Ki­ba­na) an­zu­se­hen. Das ganze ist zwar zum zen­tra­li­sier­ten Sam­meln von Sys­tem­logs ge­dacht, läßt sich aber auch zum Ana­ly­sie­ren be­ste­hen­der Log­files nut­zen. Als „Fin­ger­übung“ dien­te die Fra­ge­stel­lung: Woher kom­men denn die Leute, die re­gel­mä­ßig von de­ny­hosts in mei­nen Log­da­tei­en ver­ewigt wer­den? ;-)

Logs­tash

Logs­tash ist so­zu­sa­gen die „au­to­ma­ti­sier­te Mist­ga­bel“ für Logs. Logs­tash zapft In­put-Quel­len an (ty­pi­scher­wei­se Da­tei­en, kann aber auch als Sys­log-Da­e­mon oder ähn­li­ches die­nen), be­rei­tet sie in das ge­wünsch­te For­mat auf und schreibt das dann in ein oder meh­re­re Da­ten­sen­ken (beim klas­si­schen ELK-Stack: Elas­tic­se­arch). In mei­nem Fall woll­te ich die Log­aus­ga­be des de­ny­hosts-Da­e­mon sowie die Log­aus­ga­ben der UFW im sys­log par­sen.

Da sich das de­ny­hosts-For­mat als etwas tri­cky her­aus­stell­te, hier zu­nächst mal die Kon­fi­gu­ra­ti­on für UFW. In mei­ner sys­log-Kon­fi­gu­ra­ti­on lan­den diese be­reits in einer se­pa­ra­ten Datei. Also kon­fi­gu­riert man einen Input aus einer Datei und gibt dem gan­zen einen Ein­ga­be-Typ (selbst de­fi­niert, dient als Mar­ker für die Fil­ter­re­geln):

input {
  file {
    path => "/var/log/iptables.log"
    type => "iptables"
  }
}

Jede Zeile, die hier ge­le­sen wird, dient als Aus­gangs­ma­te­ri­al für einen Log­ein­trag. Das For­mat sieht bei UFW-Logs fol­gen­der­ma­ßen aus:

Jul  6 03:22:08 localhost kernel: [1746862.282899] [UFW BLOCK] IN=eth0 OUT= MAC=de:ad:be:ef:12:34:56:78:00:00:00:00:00:00 SRC=1.2.3.4 DST=2.3.4.5 LEN=40 TOS=0x00 PREC=0x00 TTL=249 ID=4368 PROTO=TCP SPT=45983 DPT=3389 WINDOW=1024 RES=0x00 SYN URGP=0

Der String wird nun an­hand einer Fil­ter­re­gel auf­ge­bro­chen:

filter {
  if [type] == "iptables" {
    grok {
      match => { "message" => "%{SYSLOGTIMESTAMP:timestamp} %{SYSLOGHOST:hostname}.*UFW BLOCK.*IN=%{NOTSPACE:dev_in}.*MAC=(?<mac>[0-9A-F]{2}(:[0-9a-fA-F]{2})*) SRC=%{IP:src_ip} DST=%{IP:dst_ip}.*SPT=%{NUMBER:src_port:int} DPT=%{NUMBER:dst_port:int}" }
    }
  }
}

grok ist die „ei­er­le­gen­de Woll­milch­sau“ von logs­tash. Beim Ent­wer­fen der Re­geln ist die grok-De­bug­ging­sei­te sehr hilf­reich, hier kann man seine Re­geln mit Test­ein­ga­ben so lange aus­pro­bie­ren, bis alles paßt.

logs­tash kennt als Da­ten­ty­pen zu­nächst nur String und Int; wenn man nichts wei­ter un­ter­nimmt, wird ein Feld als String in­ter­pre­tiert. In obi­gem Bei­spiel sieht man bei den Fel­dern src_port und dst_port, daß diese ex­pli­zit als In­te­ger in­ter­pre­tiert wer­den sol­len.

Für eine schö­ne Aus­wer­tung soll­te aber der Zeits­tem­pel auch als Zeit in­ter­pre­tiert wer­den! Dies er­folgt mit einem wei­te­ren Fil­ter:

date {
  match => [ "timestamp", "MMM  d HH:mm:ss", "MMM dd HH:mm:ss" ]
}

Damit wären die Daten zu­mi­dest ru­di­men­tär auf­be­rei­tet und in ein­zel­ne Fel­der auf­ge­trennt. Nun sol­len diese in eine lokal lau­fen­de Elas­tic­se­arch-In­stanz ge­schrie­ben wer­den:

output {
  elasticsearch {
    host => "127.0.0.1"
  }
}

Geo-In­for­ma­tio­nen

Um die Ana­ly­se nach dem Ur­sprung zu er­mög­li­chen, muß die zu­ge­hö­ri­ge Geo-In­for­ma­ti­on hin­zu­ge­fügt wer­den. Hier­für bie­tet logs­tash den geoip-Fil­ter, der an­hand einer IP zu­ge­hö­ri­ge Geo-In­for­ma­tio­nen er­gänzt.

if [src_ip]  {
  geoip {
    source => "src_ip"
    target => "geoip"
    add_field => [ "[geoip][coordinates]", "%{[geoip][longitude]}" ]
    add_field => [ "[geoip][coordinates]", "%{[geoip][latitude]}"  ]
  }
  mutate {
    convert => [ "[geoip][coordinates]", "float" ]
  }
}

Auf diese Fil­ter­re­gel bin ich auf die­ser Seite ge­sto­ßen, der Autor ver­such­te, die Quel­len von Apa­che-Zu­grif­fen und Mails, die im Gray­lis­ting lan­den, auf einer Karte ab­zu­tra­gen.

Er­neu­tes Ein­le­sen von Log­da­tei­en

Für einen file-In­put ist das Stan­dard­ver­hal­ten, am Ende der Datei zu be­gin­nen, also auf neue Daten zu war­ten und die vor­han­de­nen Daten zu igno­rie­ren. Möch­te man eine Datei voll­stän­dig ein­le­sen, muß man dies ex­pli­zit spe­zi­fi­zie­ren:

file {
  path => "/var/log/iptables.log"
  type => "iptables"
  start_position => "beginning"
}

Dies sorgt dafür, daß die Datei kom­plett ge­parst wird – zu­min­dest beim ers­ten Mal. An­schlie­ßend merkt sich logs­tash die letz­te Lesepo­si­ti­on. Ge­ra­de beim Ein­rich­ten von neuen Re­geln möch­te man manch­mal aber das kom­plet­te Neu­ein­le­sen er­zwin­gen. Die letz­te Lesepo­si­ti­on wird stan­dard­mä­ßig in den Da­tei­en $HOME/.sincedb* ge­spei­chert, löscht man diese, wird die Datei noch­mals kom­plett ge­le­sen.

Ein ei­ge­ner Fil­ter

Bei den Logs von de­ny­hosts wird es etwas trick­rei­cher, diese sehen bei­spiels­wei­se fol­gen­der­ma­ßen aus:

2014-07-06 19:26:36,334 - denyhosts   : INFO     new denied hosts: ['1.2.3.4', '9.8.7.6', '2.4.6.8' ]

Das heißt, zu einer Zeile müs­sen meh­re­re Log­ein­trä­ge er­zeugt wer­den. Mein An­satz: Zu­nächst die Zeile in Fel­der auf­tei­len, an­schlie­ßend aus der Liste von IPs se­pa­ra­te Ein­trä­ge mit nur einer IP ge­ne­rie­ren. Zu­nächst also das Auf­tei­len:

if [type] == "denyhosts" {
  grep {
    match => { "message" => "new denied hosts" }
  }
  grok {
    match => { "message" => "(?<timestamp>[0-9]{4}-[0-1][0-9]-[0-3][0-9] [0-2][0-9]:[0-5][0-9]:[0-5][0-9]).*new denied hosts: \[(?<src_ip>[^\]]*)\]" }
  }
  date {
    match => [ "timestamp", "yyyy-MM-dd HH:mm:ss" ]
  }
}

Der grep-Fil­ter stammt aus dem logs­tash-cont­rib-Pa­ket und ist an sich depre­ca­ted, aber oh my… ;-)

Nun ent­hält das Feld src_ip den In­halt der ecki­gen Klam­mern oben. Der Ru­by-Be­fehl scan könn­te hier den Job er­le­di­gen, al­ler­dings gibt es hier­für kein pas­sen­des Fil­ter-Plu­gin. Glück­li­cher­wei­se ist das Schrei­ben eines sol­chen logs­tash-Plug­ins wirk­lich sehr ein­fach:

require "logstash/filters/base"
require "logstash/namespace"

class LogStash::Filters::Scan < LogStash::Filters::Base

  config_name "scan"
  milestone 1

  config :field, :validate => :string, :required => true
  config :regex, :validate => :string, :required => true

  public
  def register
    # Nothing to do
  end

  public
  def filter(event)
    return unless filter?(event)

    r=Regexp.new(@regex)
    elements = event[@field].scan(r)

    event[@field] = elements.shift

    elements.each do |element|
      new_event = event.clone
      new_event[@field] = element
      filter_matched(new_event)
      yield new_event
    end
  end
end

In vie­len an­de­ren Bei­spie­len für sol­che Fil­ter sieht man, daß er ur­sprüng­li­che Event mit­tels clone du­pli­ziert und die neuen Events mit filter_matched und yield zu­rück­ge­lie­fert wer­den; an­schlie­ßend wird der ur­sprüng­li­che Event mit­tels cancel ver­wor­fen. Dies hat bei mi nicht funk­tio­niert: Die neuen Events wur­den von der wei­te­ren Fil­ter­ket­te nicht be­ar­bei­tet, d.h. das Hin­zu­fü­gen der Geo-In­for­ma­tio­nen blieb aus. Des­halb der obige An­satz, bei dem er ur­sprüng­li­che Event er­hal­ten bleibt.

Der neue Fil­ter wird nun fol­gen­der­ma­ßen ein­ge­setzt:

scan {
  field => "src_ip"
  regex => "\d*\.\d*\.\d*\.\d*"
}

Damit wären die Re­geln kom­plett – die ge­sam­te Datei kann hier her­un­ter­ge­la­den wer­den.

Ki­ba­na

Ki­ba­na selbst ist eine reine html-Sei­te, die sämt­li­che Dar­stel­lungs-Ma­gie mit­tels Ja­va­script und Zu­grif­fen auf die Elas­tic­se­arch-Web-API er­le­digt. Hier kann man nun durch Hin­zu­fü­gen einer map zum Stan­dard-Pa­nel die Daten nach Geo-In­for­ma­ti­on aus­wer­ten:

Fazit: Coole Tools, viel Spaß, gra­fi­scher Bling – und die Chi­ne­sen sind of­fen­bar nicht an allem Schind­lu­der im Netz schuld ;-)