Skip to content

Instantly share code, notes, and snippets.

@mikluko
Created March 11, 2011 08:03
Show Gist options
  • Save mikluko/865600 to your computer and use it in GitHub Desktop.
Save mikluko/865600 to your computer and use it in GitHub Desktop.
multiprocess version of ulog-acctd logs parser
#!/usr/bin/env python
'''
/etc/shorewall/rules::
SECTION ESTABLISHED
LOG:ULOG all net
LOG:ULOG net all
SECTION RELATED
LOG:ULOG all net
LOG:ULOG net all
SECTION NEW
LOG:ULOG all net
LOG:ULOG net all
/etc/ulog-acctd.conf::
multicast groups=1
accounting file=/var/log/ulog-acctd/account.log
dump file=/var/log/ulog-acctd/dump
debug file=/var/log/ulog-acctd/debug.log
debug = syscall, misc, statistics, error, asdf, error-packet
accounting format="%t\t%s\t%d\t%b\n"
empty interface="-"
empty prefix="-"
flush=300
fdelay=0
# apt-get install python-iplib python-prettytable python-argparse python-dateutil
'''
from dateutil.parser import parse as _parse_date
from dateutil.relativedelta import relativedelta as rdelta, MO
from dateutil.tz import tzlocal
from functools import partial
from iplib import IPv4Address, CIDR
from itertools import chain, izip
from prettytable import PrettyTable, FRAME, NONE
import argparse
import csv
import datetime as dt
import gzip
import socket
import sys
DEFAULT_LOCAL_NETS = [CIDR('192.168.0.0/16')]
CONCURRENCY = 3
PERIOD_DAY, PERIOD_WEEK, PERIOD_MONTH = 'day', 'week', 'month'
CHOICES_PERIOD = frozenset([PERIOD_DAY, PERIOD_WEEK, PERIOD_MONTH])
DEFAULT_PERIOD = PERIOD_DAY
FORMAT_TABLE, FORMAT_CSV = 'table', 'csv'
CHOICES_FORMAT = frozenset([FORMAT_TABLE, FORMAT_CSV])
DEFAULT_FORMAT = FORMAT_TABLE
FIELD_HOST, FIELD_IN, FIELD_OUT = 'host', ' in', ' out'
UNITS = [
(1024 ** 4, '%.1fT'),
(1024 ** 3, '%.1fG'),
(1024 ** 2, '%.1fM'),
(1024 ** 1, '%.1fK'),
]
def iter_rows(files):
for s in chain(*list([f.readlines() for f in files])):
yield s
def iter_rows_curry(files, **kwargs):
for s in iter_rows(files):
yield s, kwargs
def report_map(line, start=None, end=None, local_nets=DEFAULT_LOCAL_NETS):
def check_local(ip):
return any(ip in net for net in local_nets)
ts, src, dst, bytes = line.split()
ts_int = int(ts)
if (start and ts_int < start) or (end and ts_int > end):
return None, None
if check_local(src) and not check_local(dst):
return src, [0, int(bytes)]
if check_local(dst) and not check_local(src):
return dst, [int(bytes), 0]
return None, None
def report_map_uncurry(a):
return report_map(a[0], **a[1])
def report_partition(mapping):
result = {}
for ip, traffic in mapping:
if ip is None:
continue
result.setdefault(ip, [])
result[ip].append(traffic)
return result.items()
def report_reduce(items):
return items[0], map(sum, izip(*items[1]))
def resolve(items):
for ip, traffic in items:
try:
host = socket.gethostbyaddr(ip)[0].lower()
except socket.herror:
host = ip
yield host, traffic
def units(bytes):
n = float(bytes)
if n:
for a, f in UNITS:
if n >= a:
return f % (n / a)
return '%s ' % bytes
def parse_date(s):
return _parse_date(s, tzinfos={None: tzlocal()}).date()
def parse_cidr(s):
cidr = '/' in s and s or '%s/32' % s
return CIDR(cidr)
class Table(PrettyTable):
def get_string(self, start=0, end=None, fields=None,
header=True, border=True, hrules=FRAME, sortby=None, reversesort=False):
string = PrettyTable.get_string(self, start, end, fields, header, border, hrules, sortby, reversesort)
if header and not start and not end:
string += '\n' + self._stringify_footer(fields, border, hrules)
if hrules != NONE:
string += '\n' + self._stringify_hrule(fields, border)
return string
def set_footer(self, footer):
self.footer = footer
def _stringify_footer(self, fields=None, border=True, hrules=FRAME):
return self._stringify_row(self.footer, fields, border, hrules)
class FileType(argparse.FileType):
def __call__(self, string):
if string.endswith('.gz'):
return gzip.open(string)
return super(FileType, self).__call__(string)
class PoolMock(object):
def map(self, *args):
return map(*args)
def main():
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--concurrency', type=int, default=4)
parser.add_argument('-f', '--format', action='store', default=DEFAULT_FORMAT, choices=CHOICES_FORMAT)
parser.add_argument('-l', '--local', action='append', metavar='CIDR', dest='local_nets', type=parse_cidr)
parser.add_argument('-p', '--period', default=DEFAULT_PERIOD, choices=CHOICES_PERIOD)
parser.add_argument('-r', '--resolve', action='store_true', default=False)
parser.add_argument('-w', '--when', default=dt.date.today(), type=parse_date, metavar='ISO_DATE')
parser.add_argument('files', type=FileType(), metavar='FILENAME', nargs='*', default=[sys.stdin])
args = parser.parse_args(sys.argv[1:])
if args.period == PERIOD_DAY:
start = args.when
end = start + rdelta(days=1)
elif args.period == PERIOD_WEEK:
start = args.when + rdelta(weekday=MO(-1))
end = start + rdelta(weeks=1)
elif args.period == PERIOD_MONTH:
start = args.when + rdelta(day=1)
end = start + rdelta(months=1)
else:
raise Exception('period argument error')
if args.concurrency > 1:
from multiprocessing import Pool
pool = Pool(processes=args.concurrency)
else:
pool = PoolMock()
mapargs = {
'local_nets': args.local_nets or DEFAULT_LOCAL_NETS,
'start': int(start.strftime('%s')),
'end': int(end.strftime('%s'))
}
if sys.version_info > (2, 7):
mapping = pool.map(partial(report_map, **mapargs), iter_rows(args.files))
else:
mapping = pool.map(report_map_uncurry, iter_rows_curry(args.files, **mapargs))
result = pool.map(report_reduce, report_partition(mapping))
if args.resolve:
result = list(resolve(result))
if args.format == FORMAT_TABLE:
table = Table([FIELD_HOST, FIELD_IN, FIELD_OUT])
table.set_field_align(FIELD_HOST, 'l')
table.set_field_align(FIELD_IN, 'r')
table.set_field_align(FIELD_OUT, 'r')
for host, traffic in result:
table.add_row([host, units(traffic[0]), units(traffic[1])])
totals = map(sum, zip(*[r[1] for r in result]))
table.set_footer(['', units(totals[0]), units(totals[1])])
table.printt(sortby=FIELD_HOST)
if args.format == FORMAT_CSV:
writer = csv.writer(sys.stdout)
writer.writerow(['host', 'in', 'out'])
for host, row in result:
writer.writerow(list(chain([host], row)))
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment