Skip to content

Instantly share code, notes, and snippets.

@itayw
Last active October 15, 2015 09:42
Show Gist options
  • Save itayw/df100df36f6d0a8cba36 to your computer and use it in GitHub Desktop.
Save itayw/df100df36f6d0a8cba36 to your computer and use it in GitHub Desktop.
CEF parsing
require "logstash/codecs/base"
class LogStash::Codecs::CEF < LogStash::Codecs::Base
config_name "cef"
config :signature, :validate => :string, :default => "Logstash"
config :name, :validate => :string, :default => "Logstash"
config :sev, :validate => :number, :default => 6
config :fields, :validate => :array
public
def initialize(params={})
super(params)
end
public
def decode(data)
# Strip any quotations at the start and end, flex connectors seem to send this
if data[0] == "\""
data = data[1..-2]
end
event = LogStash::Event.new
# Split by the pipes
event['cef_version'], event['cef_vendor'], event['cef_product'], event['cef_device_version'], event['cef_sigid'], event['cef_name'], event['cef_severity'], message = data.split /(?<!\\)[\|]/
# Try and parse out the syslog header if there is one
if event['cef_version'].include? ' '
event['syslog'], unused, event['cef_version'] = event['cef_version'].rpartition(' ')
end
# Get rid of the CEF bit in the version
event['cef_version'].sub! /^CEF:\s?/, ''
# Strip any whitespace from the message
if not message.nil? and message.include? '='
message = message.strip
# If the last KVP has no value, add an empty string, this prevents hash errors below
if message.end_with?("=")
message=message + ' '
end
# Now parse the key value pairs into it
extensions = {}
message = message.split(/ ([\w\.]+)=/)
key, value = message.shift.split('=', 2)
extensions[key] = value
Hash[*message].each{ |k, v| extensions[k] = v }
# And save the new has as the extensions
event['cef_ext'] = extensions
end
yield event
end
public
def encode(data)
# "CEF:0|Elasticsearch|Logstash|1.0|Signature|Name|Sev|"
# TODO: Need to check that fields are set!
# Signature, Name, and Sev should be set in the config, with ref to fields
# Should also probably set the fields sent
header = ["CEF:0", "Elasticsearch", "Logstash", "1.0", @signature, @name, @sev].join("|")
values = @fields.map {|name| get_value(name, data)}.join(" ")
# values = values.map {|k,v| "#{k}=#{v}"}.join(" ")
@on_event.call(header + " " + values + "\n")
end
private
def get_value(name, event)
val = event[name]
case val
when Hash
return name + "=" + val.to_json
else
return name + "=" + val
end
end
end
index.merge.scheduler.max_thread_count: 1
index.merge.policy.reclaim_deletes_weight: 0.0
index.translog.flush_threshold_size: 1gb
#index.refresh_interval: 1s
indices.memory.index_buffer_size: 512mb
indices.memory.min_shard_index_buffer_size: 12mb
indices.memory.min_index_buffer_size: 96mb
indices.fielddata.cache.size: 15%
indices.fielddata.cache.expire: 6h
indices.cache.filter.size: 15%
indices.cache.filter.expire: 6h
threadpool.search.type: cached
threadpool.bulk.type: fixed
threadpool.bulk.size: 24
threadpool.bulk.queue_size: 500
threadpool.index.type: fixed
threadpool.index.size: 16
threadpool.index.queue_size: 500
monitor.jvm.gc.young.warn: 1000ms
monitor.jvm.gc.young.info: 700ms
monitor.jvm.gc.young.debug: 400ms
monitor.jvm.gc.old.warn: 10s
monitor.jvm.gc.old.info: 5s
monitor.jvm.gc.old.debug: 2s
script.inline: on
script.indexed: on
script.file: on
script.default_lang: javascript
script.engine.javascript.inline: on
script.engine.javascript.indexed: on
script.engine.javascript.file: on
script.disable_dynamic: false
input {
redis {
host => 'localhost'
data_type => 'list'
key => 'logstash:redis'
type => 'redis-input'
}
}
#input { stdin { type => "producer"
#codec => 'cef'
#} }
filter {
}
output {
stdout {
codec => rubydebug
}
elasticsearch {
action => 'index'
index => 'ceftest'
protocol => 'http'
}
}
input {
stdin {
type => "producer"
codec => cef
}
}
output {
redis {
host => 'localhost'
data_type => 'list'
key => 'logstash:redis'
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment