Skip to content

Instantly share code, notes, and snippets.

@markuskont
Last active September 15, 2016 10:56
Show Gist options
  • Save markuskont/4e82accee7220814871038809d9799f5 to your computer and use it in GitHub Desktop.
Save markuskont/4e82accee7220814871038809d9799f5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
# A simple python playground for playing with top-k algorithms
# process data stream S and return most frequent K elements
# in my case, I create a unix socket and process syslog stream from syslog-ng
# syslog template only contains host, program and message; no timestamp
# as this is a naive implementation, with native python data structures (dictionaries)
# thus only useful for testing, not real data streams
import socket
import os, os.path
import time
import re
SOCKET='/tmp/sock'
K=3
LINEPATTERN=re.compile('^.+alert.+$')
def FrequentAdd(counters, item, k):
if item in counters:
counters[item] = counters[item] + 1
elif len(counters) <= k:
counters[item] = 1
print "ADDED: %s" % item
else:
for key, value in counters.copy().items():
if value > 1:
counters[key] = vvalue - 1
else:
del counters[key]
print "DROPPED: %s" % key
return counters
def SpaceSavingAdd(counters, item, k):
# if item seen, increment counter
if item in counters:
counters[item] = counters[item] + 1
# if number of distinct seen items is less than k
elif len(counters) < k:
counters[item] = 1
# if new item is seen
else:
item_with_least_hits = min(counters, key=counters.get)
del counters[item_with_least_hits]
counters[item] = 1
print "DROPPED: %s" % item_with_least_hits
#print "ADDED: %s" % item
return counters
def main():
if os.path.exists( SOCKET ):
os.remove( SOCKET )
print "Opening socket..."
server = socket.socket( socket.AF_UNIX, socket.SOCK_DGRAM )
server.bind(SOCKET)
counters = {}
print "Listening..."
while True:
try:
datagram = server.recv( 4096 )
if not datagram:
break
else:
if LINEPATTERN.match(datagram):
topk = SpaceSavingAdd(counters, datagram, K)
except KeyboardInterrupt:
break
print "Shutting down..."
server.close()
os.remove( SOCKET )
print "Done"
if __name__ == "__main__":
main()
#template topk_processing { template"$(format-json --key .cee.*)n"); };
filter f_suricata {program("suricata")};
parser p_cee { json-parser(prefix(".cee.")); };
filter f_cee { match("@cee:" type(glob)); };
filter f_alert { message("alert"); };
rewrite r_cee { subst("^@cee: *", "", value("MESSAGE")); };
filter f_topk {filter(f_suricata) and filter(f_alert)};
#destination d_unix_stream { unix-dgram("/tmp/sock" template("$(format-json --scope selected_macros --scope nv_pairs)\n")); };
destination d_topk_unix_stream {
unix-dgram(
"/tmp/sock"
template("${HOST} ${.cee.event_type} ${.cee.src_ip} -> ${.cee.dest_ip} ${.cee.alert.signature}")
);
};
destination d_topk_file {
file(
"/var/log/test.log"
template("${HOST} ${.cee.event_type} ${.cee.src_ip} -> ${.cee.dest_ip} ${.cee.alert.signature}\n")
);
};
#log { source(s_remote_ietf); filter(f_suricata); destination(d_unix_stream); };
log {
source(s_remote_ietf);
filter(f_topk);
rewrite(r_cee);
parser(p_cee);
destination(d_topk_unix_stream);
#destination(d_topk_file);
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment