Last active
August 29, 2015 14:23
-
-
Save hillar/c91709b127df4284de53 to your computer and use it in GitHub Desktop.
test unified2 reader, see idstools, idiokit, abusehelper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
! testing without XMPP server | |
for real life use: | |
from idstools import unified2 | |
import idiokit | |
from abusehelper.core import bot | |
class Unified2Bot(bot.FeedBot): | |
unified2_dir = bot.Param(""" | |
Path to unified2 spool directory | |
default: %default | |
""", default="/var/log/suricata") | |
unified2_prefix = bot.Param(""" | |
Filename prefix for unified2 log files | |
default: %default | |
""", default="unified2") | |
@idiokit.stream | |
def feed(self): | |
reader = unified2.SpoolEventReader(self.unified2_dir,self.unified2_prefix,follow = True) | |
while True: | |
event = yield idiokit.thread(reader.next) | |
if event is None: | |
break | |
for key in event: | |
print key, event[key] | |
if __name__ == "__main__": | |
Unified2Bot.from_command_line().execute() | |
""" | |
from idstools import unified2 | |
import idiokit | |
from abusehelper.core import bot | |
class TestUnified2Bot(bot.Bot): | |
unified2_dir = bot.Param(""" | |
Path to unified2 spool directory | |
default: %default | |
""", default="/var/log/suricata") | |
unified2_prefix = bot.Param(""" | |
Filename prefix for unified2 log files | |
default: %default | |
""", default="unified2") | |
def __init__(self, *args, **keys): | |
bot.Bot.__init__(self, *args, **keys) | |
@idiokit.stream | |
def _run(self): | |
reader = unified2.SpoolEventReader(self.unified2_dir,self.unified2_prefix,follow = True) | |
while True: | |
event = yield idiokit.thread(reader.next) | |
if event is None: | |
break | |
for key in event: | |
print key, event[key] | |
def run(self): | |
@idiokit.stream | |
def throw_stop_on_signal(): | |
try: | |
yield idiokit.consume() | |
except idiokit.Signal: | |
raise services.Stop() | |
return idiokit.main_loop(throw_stop_on_signal() | self._run()) | |
@idiokit.stream | |
def main(self): | |
yield idiokit.consume() | |
if __name__ == "__main__": | |
TestUnified2Bot.from_command_line().execute() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment