Die Basis-Suche & das Pipe |
Merke: Alles vor der ersten Pipe ist die Basis-Suche. Danach kommt | + Befehl. Wie Unix-Pipes.
Nur Basis-Suche (Rohdaten)
index=firewall action=blocked
Basis-Suche + eine Pipe
index=firewall action=blocked
| stats count by src_ip
Verkettete Pipes
index=firewall action=blocked
| stats count by src_ip
| sort -count
| head 10
Tipp: Filtere so früh wie möglich in der Pipe-Kette → weniger Events zu verarbeiten = schneller.
Implizites AND zwischen Termen
Mehrere Begriffe ohne Operator werden mit AND verknüpft. Splunk fügt das AND implizit ein.
Diese Queries sind äquivalent
index=windows EventCode=4625 User=admin
index=windows EventCode=4625 AND User=admin
Logische Operatoren
| Operator | Beispiel | Bedeutung |
AND | error AND timeout | Beide müssen vorkommen |
OR | error OR warning | Eines muss vorkommen |
NOT | NOT error | Darf nicht vorkommen |
( ) | (A OR B) AND C | Gruppierung / Vorrang |
Beispiel mit Gruppierung
index=auth (failed OR error) AND NOT src_ip=10.0.0.1
Index – Datenquelle bestimmen
Der Index ist der primäre Filter. Ohne index= durchsucht Splunk alle Indizes, auf die du Zugriff hast — teuer!
Syntax-Varianten
index=windows
index=windows OR index=linux
index=prod_*
index=*
Typische CSIRT-Indizes
Firewall / NSM
index=network
Index + Sourcetype kombinieren
index=windows sourcetype=WinEventLog:Security
index=network sourcetype=cisco:asa
Metadata – Indizes & Sourcetypes entdecken
Was ist metadata? Ein spezieller SPL-Befehl, der nicht Events durchsucht, sondern die Metadaten der Indizes abfragt – z.B. welche Sourcetypes, Hosts oder Sources existieren, wann zuletzt Daten kamen, und wie viele Events vorhanden sind. Extrem schnell, da keine Event-Daten gelesen werden.
Alle verfügbaren Indizes anzeigen
| metadata type=indexes
| table title, totalCount, recentTime
| eventcount summarize=false index=*
| dedup index
| table index
Alle Sourcetypes eines Index anzeigen
| metadata type=sourcetypes index=botsv1
index=botsv1 | stats count by sourcetype
Alle Indizes mit ihren Sourcetypes (Gesamtübersicht)
| metadata type=sourcetypes index=*
| eval last_seen=strftime(recentTime, "%Y-%m-%d %H:%M:%S")
| table index, sourcetype, totalCount, last_seen
| sort index
Metadata-Typen
| type= | Zeigt | Typischer Use Case |
indexes | Alle Indizes | Welche Datenquellen existieren? |
sourcetypes | Alle Sourcetypes | Welche Log-Formate sind vorhanden? |
sources | Alle Source-Pfade | Woher kommen die Daten? |
hosts | Alle Hosts | Welche Systeme liefern Logs? |
Tipp: metadata ist ideal, um sich in einer unbekannten Splunk-Umgebung schnell zurechtzufinden – z.B. bei BOTSv1/v2/v3 Datasets.
Zeitraum & Time Handling
index=windows earliest=-24h latest=now
index=windows earliest=-7d@d latest=@d
index=windows
earliest="2025-06-01T00:00:00"
latest="2025-06-11T23:59:59"
| Kürzel | Bedeutung |
-1h | Letzte Stunde |
-24h | Letzte 24 Stunden |
-7d@d | 7 Tage, auf Tagesbeginn gerundet |
@w0 | Anfang dieser Woche (Sonntag) |
@mon | Anfang dieses Monats |
Zeitkonvertierung in der Pipeline
| eval human = strftime(_time, "%Y-%m-%d %H:%M:%S %Z")
| eval ts = strptime(date, "%d/%b/%Y:%H:%M:%S")
| eval minuten = round((end-start)/60, 2)
| bin _time span=5m
| stats count by _time
Wichtige SPL-Befehle
| Befehl | Beschreibung | Beispiel |
stats | Aggregation: count, sum, avg, dc, values | stats count by host |
table | Felder als Tabelle ausgeben | table _time, host, status |
fields | Felder ein-/ausschließen | fields + host, status |
where | Filtert nach Ausdruck auf Feldern | where len(user) > 3 |
search | Volltext-Filter in der Pipeline | search status=500 |
eval | Neue Felder berechnen / transformieren | eval gb=bytes/1GB |
rex | Regex-Extraktion auf Felder | rex "user=(?<u>\w+)" |
dedup | Duplikate entfernen | dedup src_ip |
sort | Sortieren (- = absteigend) | sort -count, +host |
head/tail | Erste / letzte N Ergebnisse | head limit=20 |
rename | Felder umbenennen | rename data.user as user |
top/rare | Häufigste / seltenste Werte | top limit=10 useragent |
timechart | Zeitbasierte Aggregation für Charts | timechart count by status |
chart | Statistik-Tabelle mit Split | chart avg(cpu) over host |
lookup | Externe Tabelle joinen (IOC) | lookup ioc.csv ip OUTPUT cat |
transaction | Events zu Sessions gruppieren | transaction session maxspan=30m |
eventstats | Statistiken als Felder hinzufügen | eventstats avg(rt) by host |
streamstats | Laufende Summen / Kumulativ | streamstats sum(bytes) as total |
bin | Werte in Buckets gruppieren | bin _time span=5m |
tstats | High-Perf. auf indizierten Feldern | tstats count where index=web |
appendpipe | Summenzeile ans Ergebnis anhängen | appendpipe [stats sum(count)] |
fillnull | Null-Werte ersetzen | fillnull value="N/A" |
mvexpand | Multivalue in Einzel-Events | mvexpand recipients |
inputlookup | CSV als Datenquelle laden | inputlookup employees.csv |
outputlookup | Ergebnisse in CSV schreiben | outputlookup results.csv |
Praxis-Queries (CSIRT)
Brute-Force Logins erkennen
index=windows sourcetype=WinEventLog:Security EventCode=4625
| stats count by src_ip, Account_Name
| where count > 10
| sort -count
PowerShell Execution mit Encoded Command
index=edr process_name="powershell.exe" CommandLine=*-EncodedCommand*
| table _time, host, user, CommandLine
| sort -_time
Lateral Movement via Remote Services (4648)
index=windows EventCode=4648 earliest=-1h
| eval pair=src_user." → ".dest_host
| stats count values(pair) as targets by src_host
| where count > 3
IOC-Lookup gegen Threat Intel Liste
index=network sourcetype=firewall
| lookup malicious_ips.csv dest_ip OUTPUT threat_category
| where isnotnull(threat_category)
| table _time, src_ip, dest_ip, threat_category
Neues Gerät im Netz (first seen)
index=network earliest=-30d
| stats min(_time) as first_seen by src_mac, src_ip
| where first_seen > relative_time(now(), "-24h")
| convert ctime(first_seen)
Wildcards & Feldsuche
process_name=powershell*
CommandLine=*mimikatz*
src_ip=*
NOT error_code=*
Performance: Wildcards am Anfang (*xyz) erzwingen einen Full-Scan. Wenn möglich vermeiden oder mit Index-Filterung kombinieren.
eval – Felder berechnen
| eval asset=host.":".src_ip
| eval severity=if(count>100, "HIGH", "LOW")
| eval risk=case(
score>80, "Critical",
score>50, "Medium",
1==1, "Low"
)
| eval user=coalesce(username, email, "anonymous")
| eval human_time=strftime(_time, "%Y-%m-%d %H:%M:%S")
| eval epoch=strptime(date_field, "%d/%b/%Y:%H:%M:%S")
| eval gb=bytes/1024/1024/1024
| eventstats avg(rt) as avg_rt, stdev(rt) as stdev_rt
| where rt > (avg_rt + 2*stdev_rt)
String-Operationen
Konkatenation
| eval full_name = first_name . " " . last_name
Substring
| eval short = substr(word, 1, 3)
Regex Replacement (rex mode=sed)
| rex mode=sed field=msg "s/\s+/ /g"
| rex mode=sed field=email "s/@/ [at] /"
replace – einfache Ersetzung
| replace "localhost" with "127.0.0.1" in host
Whitespace ignorieren beim Match
| rex field=MessageStatus "(?<status>\w+)"
| eval ok=if(status="delivered", "yes", "no")
Multivalue-Felder
| mvexpand recipients
| eval first = mvindex(items, 0)
| eval userlist = mvjoin(users, ", ")
| eval errors = mvfilter(match(msgs, "ERROR"))
Filterung: where vs search
where – Ausdrücke & Funktionen
| where isnotnull(user) AND len(user) > 3
| where like(url, "%/api/%")
search – Keyword-Matching mit Wildcards
| search user=admin* status!=404
NOT vs != (wichtiger Unterschied!)
status!=500
NOT status=500
Merke: != filtert nur explizite Werte. Mit NOT werden auch Events ohne das Feld ausgeschlossen. Für "alles außer X mit Feld" nimm !=; für "garantiert keine X" nimm NOT.
Analyse: Perzentile & Ausreisser
Perzentile
index=web
| stats avg(rt) as avg_rt,
median(rt) as p50_rt,
perc95(rt) as p95_rt,
perc99(rt) as p99_rt
by endpoint
Ausreisser mit Standardabweichung
index=web
| eventstats avg(rt) as avg_rt, stdev(rt) as stdev_rt
| where rt > (avg_rt + 2*stdev_rt)
Statistische Verteilung
| stats count, dc(src_ip) as unique_visitors,
values(useragent) as ua_list by endpoint
Lookup-Tabellen
CSV-Lookup zur Anreicherung
index=network
| lookup threat_intel.csv ip as src_ip
OUTPUT threat_level, category
Lookup als Datenquelle nutzen
| inputlookup employees.csv
| search department="IT"
Ergebnisse in Lookup schreiben
index=web status=500
| stats count by src_ip
| where count > 100
| outputlookup suspicious_ips.csv
Use Case: IOC-Liste als CSV anlegen und per lookup gegen Firewall-Logs joinen → sofortige Trefferanzeige.
Transaktionen
Session Grouping (teuer)
index=web
| transaction session_id maxspan=30m maxpause=5m
| where duration > 60
| table session_id, duration, eventcount
Alternative mit stats (schneller)
| stats min(_time) as start, max(_time) as end,
count as events by session_id
| eval duration = end - start
| where duration > 60
Performance: transaction ist ressourcenintensiv. Nutze stats + min(_time) / max(_time) wenn möglich.
Mehrere Queries kombinieren
Subsearch
index=web user=[search index=vpn
| dedup user | fields user]
| stats count by user
Join
index=main
| join type=left user [
search index=hr
| stats count by user
]
Append
index=web status=500
| stats count as errors by host
| append [
search index=web status=200
| stats count as success by host
]
Tipp: Subsearch-Ergebnisse werden auf 10.000 Events begrenzt (einstellbar mit format). Subsearch immer in eckige Klammern [ ] setzen!
Performance-Tipps
index=web sourcetype="access_log" status=500
| fields host, status, response_time
| stats avg(response_time) by host
| tstats count where index=web by host, status
| stats min(_time) as start by session
Reguläre Ausdrücke (rex)
Felder extrahieren
| rex field=_raw "user=(?<username>\w+)"
| rex field=url "\/api\/(?<version>v\d+)\/(?<endpoint>\w+)"
Mehrere Treffer pro Event (max_match)
| rex field=_raw max_match=0
"error_code=(?<codes>\d+)"
Häufige Regex-Patterns
(?<ip>\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})
(?<email>[\w.+-]+@[\w-]+\.\w+)
(?<uuid>[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})
(?<path>\/[\w\/.-]+)
(?<domain>(?:https?:\/\/)?([\w.-]+)\/)
Tips & Tricks
|
|
Macro
`get_errors(index)`
Schnelle Feldübersicht
| fieldsummary
| top limit=20 useragent
| highlight error, warning, critical
Alerting – Error Rate überwachen
index=web
| bin _time span=5m
| stats count(eval(status>=500)) as errors,
count as total by _time
| eval error_rate = round(errors/total*100, 2)
| where error_rate > 5
Nützliche SPL-Idiome
| fillnull value="N/A" username, email
| eval users = mvjoin(mvdedup(user_list), ", ")
| stats count(eval(status="success")) as successes,
count(eval(status="failure")) as failures
| sort -_time | dedup src_ip
| rename signals.ip_address as ip_addr
| eval ip_addr = if(isnull(ip_addr), "unknown", ip_addr)