Last active
April 17, 2020 22:27
-
-
Save whiskeyalpharomeo/92538521bb631dd2db38 to your computer and use it in GitHub Desktop.
Logstash Filter for Processing sFlow FLOW records
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
################# | |
# Sflow Filters # | |
################# | |
filter { | |
if [type] == "sflow" { | |
# sFlow sends two kinds of messages - CNTRs and FLOWs | |
# I'm not doing anything with CNTRs at this point, so | |
# I drop those, and we concentrate on processing FLOW | |
# messages. | |
if [message] =~ /CNTR/ { | |
drop { } | |
} | |
# sFlow FLOW messages break down into the following fields. | |
# I have named them arbitrarily, but they correspond to the | |
# actual field names. I developed this grok pattern using | |
# two tools: | |
# | |
# - sflowtool (http://www.inmon.com/technology/sflowTools.php) | |
# Written by InMon, it provides a way to examine sFlow messages | |
# in human readable format. | |
# | |
# - Grok Debugger (https://grokdebug.herokuapp.com/) | |
# Invaluable, and self-explanatory. | |
grok { | |
match => { "message" => "%{WORD:SampleType},%{IP:sflow.ReporterIP},%{WORD:sflow.inputPort},%{WORD:sflow.outputPort},%{WORD:sflow.srcMAC},%{WORD:sflow.dstMAC},%{WORD:sflow.EtherType},%{NUMBER:sflow.in_vlan},%{NUMBER:sflow.out_vlan},%{IP:sflow.srcIP},%{IP:sflow.dstIP},%{NUMBER:sflow.IPProtocol},%{WORD:sflow.IPTOS},%{WORD:sflow.IPTTL},%{NUMBER:sflow.srcPort},%{NUMBER:sflow.dstPort},%{DATA:sflow.tcpFlags},%{NUMBER:sflow.PacketSize},%{NUMBER:sflow.IPSize},%{NUMBER:sflow.SampleRate}" } | |
} | |
# Sometimes it doesn't work out. | |
if "_grokparsefailure" in [tags] { | |
drop { } | |
} | |
# Because I'll ultimately be displaying all of this | |
# in Kibana, I want to translate many of the IP addresses | |
# into recognizable hostnames. We take the IP fields, | |
# and copy them into new fields that we'll be doing DNS | |
# lookups on: | |
mutate { | |
add_field => { | |
"[sflow.SrcHostname]" => "%{sflow.srcIP}" | |
"[sflow.DstHostname]" => "%{sflow.dstIP}" | |
"[sflow.ReporterName]" => "%{sflow.ReporterIP}" | |
} | |
} | |
# I also want to translate things like Source and | |
# Destination port numbers into known service names. | |
# for this to work, you have to built some YAML files, | |
# which basically map everything you'd find in an | |
# /etc/services file. I'll post my YAML files in | |
# other Gists. | |
translate { | |
field => "[sflow.srcPort]" | |
destination => "[sflow.SrcSvcName]" | |
dictionary_path => "/etc/logstash/dictionaries/iana_services.yaml" | |
} | |
translate { | |
field => "[sflow.dstPort]" | |
destination => "[sflow.DstSvcName]" | |
dictionary_path => "/etc/logstash/dictionaries/iana_services.yaml" | |
} | |
translate { | |
field => "[sflow.IPProtocol]" | |
destination => "[sflow.ProtName]" | |
dictionary_path => "/etc/logstash/dictionaries/iana_protocols.yaml" | |
} | |
translate { | |
field => "[sflow.tcpFlags]" | |
destination => "[sflow.TCPFlagDecode]" | |
dictionary_path => "/etc/logstash/dictionaries/tcp_flags.yaml" | |
} | |
# Here we do our DNS reverse lookups: | |
dns { | |
reverse => [ "sflow.SrcHostname" ] | |
action => "replace" | |
} | |
dns { | |
reverse => [ "sflow.DstHostname" ] | |
action => "replace" | |
} | |
dns { | |
reverse => [ "sflow.ReporterName" ] | |
action => "replace" | |
} | |
# If the reverse lookup didn't fail (leaving us with | |
# nothing but numerics) we grok the FQDN, parsing it | |
# into a HOST and DOMAIN - and then we store the | |
# HOST portion into a new field - sflow.SrcHost | |
# | |
# Otherwise, we just dump the unresolved IP address | |
# into the sflow.SrcHostname field. | |
if [sflow.SrcHostname] !~ /(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ { | |
grok { | |
match => [ "[sflow.SrcHostname]", "%{DATA:src_host}\.%{GREEDYDATA:domain}" ] | |
add_field => { "[sflow.SrcHost]" => "%{src_host}" } | |
} | |
} else { | |
mutate { | |
add_field => [ "[sflow.SrcHost]", "%{[sflow.SrcHostname}" ] | |
} | |
} | |
if [sflow.DstHostname] !~ /(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ { | |
grok { | |
match => [ "[sflow.DstHostname]", "%{DATA:dst_host}\.%{GREEDYDATA:domain}" ] | |
add_field => { "[sflow.DstHost]" => "%{dst_host}" } | |
} | |
} else { | |
mutate { | |
add_field => [ "[sflow.DstHost]", "%{[sflow.DstHostname}" ] | |
} | |
} | |
if [sflow.ReporterName] !~ /(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)/ { | |
grok { | |
match => [ "[sflow.ReporterName]", "%{DATA:rep_host}(\.|\-)" ] | |
add_field => { "[sflow.Reporter" => "%{rep_host}" } | |
} | |
} else { | |
mutate { | |
add_field => [ "[sflow.Reporter]", "%{[sflow.ReporterName}" ] | |
} | |
} | |
# There are a bunch of fields from the original | |
# message, as well as some I created temporarily, | |
# that I don't care to store. Here's where I drop | |
# them all. | |
mutate { | |
remove_field => [ "host", "command", "sflow.inputPort", "sflow.outputPort", "sflow.srcMAC", "sflow.dstMAC", "sflow.EtherType", "sflow.in_vlan", "sflow.out_vlan", "sflow.IPTTL", "sflow.IPSize", "message", "src_host", "dst_host", "rep_host", "SampleType", "domain" ] | |
} | |
# Lastly, we're going to want Kibana and | |
# Elasticsearch to be able to do some math | |
# with certain fields, so make sure they're | |
# handled as numbers, rather than strings. | |
# You can also do this by setting up templates | |
# in Elasticsearch, but this is easier. | |
mutate { | |
convert => { | |
"sflow.PacketSize" => "integer" | |
"sflow.SampleRate" => "integer" | |
} | |
} | |
} | |
} |
Sometimes the flows are inverted (destination <-> source)
Would it be possible to automatically swap the fields when dstPort > 10000 ?
@Ay4shi, Using dstPort > 10000 as an inversion parameter would not always work. If you normalizing TCP conversations, try using their flags instead.
Anyone please help me to install Logstash Plugin logstash-codec-sflow.
In latest verison of Logstash, they are not providing the same.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Good advice. I'll incorporate that in the future.