Created
March 11, 2011 08:03
-
-
Save mikluko/865600 to your computer and use it in GitHub Desktop.
multiprocess version of ulog-acctd logs parser
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
''' | |
/etc/shorewall/rules:: | |
SECTION ESTABLISHED | |
LOG:ULOG all net | |
LOG:ULOG net all | |
SECTION RELATED | |
LOG:ULOG all net | |
LOG:ULOG net all | |
SECTION NEW | |
LOG:ULOG all net | |
LOG:ULOG net all | |
/etc/ulog-acctd.conf:: | |
multicast groups=1 | |
accounting file=/var/log/ulog-acctd/account.log | |
dump file=/var/log/ulog-acctd/dump | |
debug file=/var/log/ulog-acctd/debug.log | |
debug = syscall, misc, statistics, error, asdf, error-packet | |
accounting format="%t\t%s\t%d\t%b\n" | |
empty interface="-" | |
empty prefix="-" | |
flush=300 | |
fdelay=0 | |
# apt-get install python-iplib python-prettytable python-argparse python-dateutil | |
''' | |
from dateutil.parser import parse as _parse_date | |
from dateutil.relativedelta import relativedelta as rdelta, MO | |
from dateutil.tz import tzlocal | |
from functools import partial | |
from iplib import IPv4Address, CIDR | |
from itertools import chain, izip | |
from prettytable import PrettyTable, FRAME, NONE | |
import argparse | |
import csv | |
import datetime as dt | |
import gzip | |
import socket | |
import sys | |
DEFAULT_LOCAL_NETS = [CIDR('192.168.0.0/16')] | |
CONCURRENCY = 3 | |
PERIOD_DAY, PERIOD_WEEK, PERIOD_MONTH = 'day', 'week', 'month' | |
CHOICES_PERIOD = frozenset([PERIOD_DAY, PERIOD_WEEK, PERIOD_MONTH]) | |
DEFAULT_PERIOD = PERIOD_DAY | |
FORMAT_TABLE, FORMAT_CSV = 'table', 'csv' | |
CHOICES_FORMAT = frozenset([FORMAT_TABLE, FORMAT_CSV]) | |
DEFAULT_FORMAT = FORMAT_TABLE | |
FIELD_HOST, FIELD_IN, FIELD_OUT = 'host', ' in', ' out' | |
UNITS = [ | |
(1024 ** 4, '%.1fT'), | |
(1024 ** 3, '%.1fG'), | |
(1024 ** 2, '%.1fM'), | |
(1024 ** 1, '%.1fK'), | |
] | |
def iter_rows(files): | |
for s in chain(*list([f.readlines() for f in files])): | |
yield s | |
def iter_rows_curry(files, **kwargs): | |
for s in iter_rows(files): | |
yield s, kwargs | |
def report_map(line, start=None, end=None, local_nets=DEFAULT_LOCAL_NETS): | |
def check_local(ip): | |
return any(ip in net for net in local_nets) | |
ts, src, dst, bytes = line.split() | |
ts_int = int(ts) | |
if (start and ts_int < start) or (end and ts_int > end): | |
return None, None | |
if check_local(src) and not check_local(dst): | |
return src, [0, int(bytes)] | |
if check_local(dst) and not check_local(src): | |
return dst, [int(bytes), 0] | |
return None, None | |
def report_map_uncurry(a): | |
return report_map(a[0], **a[1]) | |
def report_partition(mapping): | |
result = {} | |
for ip, traffic in mapping: | |
if ip is None: | |
continue | |
result.setdefault(ip, []) | |
result[ip].append(traffic) | |
return result.items() | |
def report_reduce(items): | |
return items[0], map(sum, izip(*items[1])) | |
def resolve(items): | |
for ip, traffic in items: | |
try: | |
host = socket.gethostbyaddr(ip)[0].lower() | |
except socket.herror: | |
host = ip | |
yield host, traffic | |
def units(bytes): | |
n = float(bytes) | |
if n: | |
for a, f in UNITS: | |
if n >= a: | |
return f % (n / a) | |
return '%s ' % bytes | |
def parse_date(s): | |
return _parse_date(s, tzinfos={None: tzlocal()}).date() | |
def parse_cidr(s): | |
cidr = '/' in s and s or '%s/32' % s | |
return CIDR(cidr) | |
class Table(PrettyTable): | |
def get_string(self, start=0, end=None, fields=None, | |
header=True, border=True, hrules=FRAME, sortby=None, reversesort=False): | |
string = PrettyTable.get_string(self, start, end, fields, header, border, hrules, sortby, reversesort) | |
if header and not start and not end: | |
string += '\n' + self._stringify_footer(fields, border, hrules) | |
if hrules != NONE: | |
string += '\n' + self._stringify_hrule(fields, border) | |
return string | |
def set_footer(self, footer): | |
self.footer = footer | |
def _stringify_footer(self, fields=None, border=True, hrules=FRAME): | |
return self._stringify_row(self.footer, fields, border, hrules) | |
class FileType(argparse.FileType): | |
def __call__(self, string): | |
if string.endswith('.gz'): | |
return gzip.open(string) | |
return super(FileType, self).__call__(string) | |
class PoolMock(object): | |
def map(self, *args): | |
return map(*args) | |
def main(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument('-c', '--concurrency', type=int, default=4) | |
parser.add_argument('-f', '--format', action='store', default=DEFAULT_FORMAT, choices=CHOICES_FORMAT) | |
parser.add_argument('-l', '--local', action='append', metavar='CIDR', dest='local_nets', type=parse_cidr) | |
parser.add_argument('-p', '--period', default=DEFAULT_PERIOD, choices=CHOICES_PERIOD) | |
parser.add_argument('-r', '--resolve', action='store_true', default=False) | |
parser.add_argument('-w', '--when', default=dt.date.today(), type=parse_date, metavar='ISO_DATE') | |
parser.add_argument('files', type=FileType(), metavar='FILENAME', nargs='*', default=[sys.stdin]) | |
args = parser.parse_args(sys.argv[1:]) | |
if args.period == PERIOD_DAY: | |
start = args.when | |
end = start + rdelta(days=1) | |
elif args.period == PERIOD_WEEK: | |
start = args.when + rdelta(weekday=MO(-1)) | |
end = start + rdelta(weeks=1) | |
elif args.period == PERIOD_MONTH: | |
start = args.when + rdelta(day=1) | |
end = start + rdelta(months=1) | |
else: | |
raise Exception('period argument error') | |
if args.concurrency > 1: | |
from multiprocessing import Pool | |
pool = Pool(processes=args.concurrency) | |
else: | |
pool = PoolMock() | |
mapargs = { | |
'local_nets': args.local_nets or DEFAULT_LOCAL_NETS, | |
'start': int(start.strftime('%s')), | |
'end': int(end.strftime('%s')) | |
} | |
if sys.version_info > (2, 7): | |
mapping = pool.map(partial(report_map, **mapargs), iter_rows(args.files)) | |
else: | |
mapping = pool.map(report_map_uncurry, iter_rows_curry(args.files, **mapargs)) | |
result = pool.map(report_reduce, report_partition(mapping)) | |
if args.resolve: | |
result = list(resolve(result)) | |
if args.format == FORMAT_TABLE: | |
table = Table([FIELD_HOST, FIELD_IN, FIELD_OUT]) | |
table.set_field_align(FIELD_HOST, 'l') | |
table.set_field_align(FIELD_IN, 'r') | |
table.set_field_align(FIELD_OUT, 'r') | |
for host, traffic in result: | |
table.add_row([host, units(traffic[0]), units(traffic[1])]) | |
totals = map(sum, zip(*[r[1] for r in result])) | |
table.set_footer(['', units(totals[0]), units(totals[1])]) | |
table.printt(sortby=FIELD_HOST) | |
if args.format == FORMAT_CSV: | |
writer = csv.writer(sys.stdout) | |
writer.writerow(['host', 'in', 'out']) | |
for host, row in result: | |
writer.writerow(list(chain([host], row))) | |
if __name__ == '__main__': | |
try: | |
main() | |
except KeyboardInterrupt: | |
pass |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment