Last active
September 15, 2016 10:56
-
-
Save markuskont/4e82accee7220814871038809d9799f5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# A simple python playground for playing with top-k algorithms | |
# process data stream S and return most frequent K elements | |
# in my case, I create a unix socket and process syslog stream from syslog-ng | |
# syslog template only contains host, program and message; no timestamp | |
# as this is a naive implementation, with native python data structures (dictionaries) | |
# thus only useful for testing, not real data streams | |
import socket | |
import os, os.path | |
import time | |
import re | |
SOCKET='/tmp/sock' | |
K=3 | |
LINEPATTERN=re.compile('^.+alert.+$') | |
def FrequentAdd(counters, item, k): | |
if item in counters: | |
counters[item] = counters[item] + 1 | |
elif len(counters) <= k: | |
counters[item] = 1 | |
print "ADDED: %s" % item | |
else: | |
for key, value in counters.copy().items(): | |
if value > 1: | |
counters[key] = vvalue - 1 | |
else: | |
del counters[key] | |
print "DROPPED: %s" % key | |
return counters | |
def SpaceSavingAdd(counters, item, k): | |
# if item seen, increment counter | |
if item in counters: | |
counters[item] = counters[item] + 1 | |
# if number of distinct seen items is less than k | |
elif len(counters) < k: | |
counters[item] = 1 | |
# if new item is seen | |
else: | |
item_with_least_hits = min(counters, key=counters.get) | |
del counters[item_with_least_hits] | |
counters[item] = 1 | |
print "DROPPED: %s" % item_with_least_hits | |
#print "ADDED: %s" % item | |
return counters | |
def main(): | |
if os.path.exists( SOCKET ): | |
os.remove( SOCKET ) | |
print "Opening socket..." | |
server = socket.socket( socket.AF_UNIX, socket.SOCK_DGRAM ) | |
server.bind(SOCKET) | |
counters = {} | |
print "Listening..." | |
while True: | |
try: | |
datagram = server.recv( 4096 ) | |
if not datagram: | |
break | |
else: | |
if LINEPATTERN.match(datagram): | |
topk = SpaceSavingAdd(counters, datagram, K) | |
except KeyboardInterrupt: | |
break | |
print "Shutting down..." | |
server.close() | |
os.remove( SOCKET ) | |
print "Done" | |
if __name__ == "__main__": | |
main() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#template topk_processing { template"$(format-json --key .cee.*)n"); }; | |
filter f_suricata {program("suricata")}; | |
parser p_cee { json-parser(prefix(".cee.")); }; | |
filter f_cee { match("@cee:" type(glob)); }; | |
filter f_alert { message("alert"); }; | |
rewrite r_cee { subst("^@cee: *", "", value("MESSAGE")); }; | |
filter f_topk {filter(f_suricata) and filter(f_alert)}; | |
#destination d_unix_stream { unix-dgram("/tmp/sock" template("$(format-json --scope selected_macros --scope nv_pairs)\n")); }; | |
destination d_topk_unix_stream { | |
unix-dgram( | |
"/tmp/sock" | |
template("${HOST} ${.cee.event_type} ${.cee.src_ip} -> ${.cee.dest_ip} ${.cee.alert.signature}") | |
); | |
}; | |
destination d_topk_file { | |
file( | |
"/var/log/test.log" | |
template("${HOST} ${.cee.event_type} ${.cee.src_ip} -> ${.cee.dest_ip} ${.cee.alert.signature}\n") | |
); | |
}; | |
#log { source(s_remote_ietf); filter(f_suricata); destination(d_unix_stream); }; | |
log { | |
source(s_remote_ietf); | |
filter(f_topk); | |
rewrite(r_cee); | |
parser(p_cee); | |
destination(d_topk_unix_stream); | |
#destination(d_topk_file); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment