Last active
September 21, 2019 00:34
-
-
Save popmonkey/8a90dd9c5f4a4edbfbd8320b62639034 to your computer and use it in GitHub Desktop.
log parser that reads multiple sources and captures SQL statements to follow a row's lifecycle - can optionally filter for specific tables and specific ids
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/env python3 | |
""" | |
This script is a log file parser that looks for sql UPDATE statements and filters for specified tables and ids. | |
It can read from multiple sources and write to a separate file for each table and id combination. | |
examples: | |
`./looklog.py -t user,order *.log`: prints UPDATE statements for the ORDER and USER tables in *.log to stdout | |
`tail -F current.log | ./looklog.py -t user -i 12345`: real time tails current.log and filters for updates to | |
USER 12345 | |
`./looklog.py -t user -s some.log`: writes a file in the form `user-<id>.log` for each USER updated in some.log | |
`./looklog.py -w *.log`: the output includes the filename of the log being parsed | |
`./looklog.py -t order -j user *.log`: updates to ORDER table joined to USER by USER_ID | |
`./looklog.py -I -t user`: also include INSERT/SELECT pairs that's a pattern that many ORMs use | |
-h for options | |
""" | |
__version__ = '0.2.7' | |
__author__ = '[email protected]' | |
import sys | |
import os | |
import argparse | |
import re | |
import time | |
import datetime | |
import heapq | |
import glob | |
from operator import itemgetter | |
import itertools | |
import collections | |
MAX_INSERTS = 100 | |
# TODO: refactor this since we're no longer really using this as a true dict | |
class FileManager(collections.OrderedDict): | |
"""Keep only the max_open most recent files open""" | |
def __init__(self, max_open=20): | |
self.seen = set() | |
self.max_open = max_open | |
super().__init__() | |
def __getitem__(self, fname): | |
fd = None | |
if fname in self: | |
fd = super().__getitem__(fname) | |
assert not fd.closed | |
if not fd: | |
fd = open(fname, 'a' if fname in self.seen else 'w') | |
self.__setitem__(fname, fd) | |
else: | |
self.move_to_end(fname) | |
return fd | |
def __setitem__(self, fname, fd): | |
self.seen.add(fname) | |
assert not fd.closed | |
super().__setitem__(fname, fd) | |
if len(self) >= self.max_open: | |
oldest = next(iter(self)) | |
self[oldest].close() | |
del self[oldest] | |
arg_parse = argparse.ArgumentParser(description='Parse SQL statements in looker logs') | |
arg_parse.add_argument('input_paths', metavar='LOGFILE', nargs='*', | |
help='log file(s)') | |
arg_parse.add_argument('-t', '--table', metavar='TABLE[,TABLE ...]', | |
help='name of SQL table(s) [can be comma separated list]') | |
arg_parse.add_argument('-i', '--id', metavar='ID[,ID ...]', | |
help='ID(s) [can be comma separated list]') | |
arg_parse.add_argument('-s', '--split', action='store_true', | |
help='split output into at most this many files named <table>-<id>.log') | |
arg_parse.add_argument('-d', '--dir', default='.', | |
help='name of directory to split files to (need -s n)') | |
arg_parse.add_argument('-w', '--with_filename', action='store_true', | |
help='include source filename when writing matched lines') | |
arg_parse.add_argument('-j', '--join', metavar='TABLE[,TABLE ...]', | |
help='also search the id column TABLE_ID for join table updates [can be comma separated list]') | |
arg_parse.add_argument('-I', '--inserts', action='store_true', | |
help='track INSERT/SELECT pairs (Sequel and similar ORM logs only)') | |
arg_parse.add_argument('-o', '--other', | |
help='any python compatible regex') | |
args = arg_parse.parse_args() | |
table_filter = '|'.join(map(str.strip, (args.table or r'.*?').split(','))) | |
id_filter = '|'.join(map(str.strip, (args.id or r'\d+').split(','))) | |
join_filter = '|'.join(list(map(lambda s: '{}_id'.format(s.strip()), (args.join or '').split(',')))) | |
update_statement_re = re.compile( | |
r'(?:UPDATE |DELETE FROM )["`](?P<table>{tables})["`].*WHERE.*["`](?:{id_cols})["`] = (?P<id>{ids})\D' | |
.format(tables=table_filter, ids=id_filter, id_cols='|'.join(['id'] + join_filter.split('|'))), | |
re.I) | |
# insert/select tracking is for Sequel generated rows only | |
insert_statement_re = re.compile( | |
r'INSERT INTO ["`](?P<table>{tables})["`]' | |
.format(tables=table_filter), re.I) | |
insert_statement_with_join_re = re.compile( | |
r'INSERT INTO ["`](?P<table>{tables})["`].*\(["`](?P<id_cols>{id_cols})["`].*VALUES \((?P<id>{ids})\D' | |
.format(tables=table_filter, ids=id_filter, id_cols=join_filter), re.I) | |
select_statement_re = re.compile( | |
r'SELECT \* FROM ["`](?P<table>{tables})["`] WHERE \(["`]{id_cols}["`] = (?P<id>{ids})\D' | |
.format(tables=table_filter, ids=id_filter, id_cols=join_filter), re.I) | |
date_re = re.compile(r'(?P<timestamp>\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d.\d+ -\d{4})') | |
if args.other: | |
other_statement_re = re.compile(args.other, re.I) | |
fm = FileManager() | |
recent_inserts = collections.OrderedDict() | |
other_fd = open('{}/other' % args.dir, 'w') if args.split else sys.stdout | |
if not args.input_paths: | |
# no paths specified so just read stdin | |
input_files = [sys.stdin] | |
else: | |
# combine all the glob'ed filenames | |
input_files = map(open, itertools.chain(*map(lambda p: glob.glob(os.path.expanduser(p)), args.input_paths))) | |
# Use Heapq's merge to lazily sort input files | |
# https://stackoverflow.com/questions/12460943/merging-pre-sorted-files-without-reading-everything-into-memory | |
def timestamp_from_line(_line): | |
ts_m = date_re.match(_line) | |
if ts_m is not None: | |
return time.mktime(datetime.datetime.strptime(ts_m.group('timestamp'), | |
'%Y-%m-%d %H:%M:%S.%f %z').timetuple()) | |
return 0 | |
# these next three lines perfectly illustrate why python is so damn awesome :) | |
decorated = [((timestamp_from_line(line), (line, fd)) for line in fd) for fd in input_files] | |
merged = heapq.merge(*decorated) | |
lines_and_fds = map(itemgetter(-1), merged) | |
def write_line(_in_fd, _m, _line, _other=False): | |
_out_fd = other_fd if _other else sys.stdout | |
if args.split and not _other: | |
# show progress if writing to split files | |
sys.stdout.write('.') | |
sys.stdout.flush() | |
fname = '{dir}/{table}-{id}.log'.format(dir=args.dir, table=_m.group('table'), id=_m.group('id')) | |
_out_fd = fm[fname] | |
# write the log line (with optional source filename) | |
_out_fd.write('{file}> {line}'.format(line=_line, file=_in_fd.name) | |
if args.with_filename else '{line}'.format(line=_line)) | |
_out_fd.flush() | |
# process each line from the merged logs | |
for (line, in_fd) in lines_and_fds: | |
if args.inserts: | |
if args.join: | |
insert_with_join_m = insert_statement_with_join_re.search(line) | |
if insert_with_join_m: | |
write_line(in_fd, insert_with_join_m, line) | |
continue | |
insert_m = insert_statement_re.search(line) | |
if insert_m: | |
recent_inserts[insert_m.group('table')] = line | |
if len(recent_inserts) > MAX_INSERTS: | |
recent_inserts.popitem() | |
continue | |
select_m = select_statement_re.search(line) | |
if select_m: | |
table = select_m.group('table') | |
if table in recent_inserts: | |
write_line(in_fd, select_m, recent_inserts[table]) | |
write_line(in_fd, select_m, line) | |
del recent_inserts[table] | |
continue | |
update_m = update_statement_re.search(line) | |
if update_m: | |
write_line(in_fd, update_m, line) | |
continue | |
if args.other: | |
other_m = other_statement_re.search(line) | |
if other_m: | |
write_line(in_fd, other_m, line, True) | |
continue |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment