Last active
November 8, 2019 05:00
-
-
Save Frick/3cd2299fb23c4405cbbac502199b922b to your computer and use it in GitHub Desktop.
Well this got out of hand quickly...
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Minimally parse logs in order to merge their contents in sorted order with minimal | |
resource usage - primarily memory, in order to support very large log files. For | |
example, whether 10 MBs or 10 GBs of logs, <15 MBs of memory is typically allocated. | |
Assumes each individual log file is already sorted. | |
The bash 'globstar' option is recommended in order to easily pass all log paths like: | |
./logsort.py mongodb-*/**/*.log | |
To enable: | |
shopt -s globstar | |
""" | |
import argparse | |
from datetime import datetime | |
import errno | |
import heapq | |
import os | |
from pathlib import Path | |
import re | |
import sys | |
# Timestamp regular expression | |
TIMESTAMP_RE = re.compile(r'(\d{4}[/-]\d{2}[/-]\d{2}[ T_]*\d{2}:\d{2}:\d{2}([.,]\d{3,6})?)') | |
SIMPLETS_RE = re.compile(r'[/ T:-]') | |
# Set from args, adjusted accordingly from initial log processing | |
START_TIME_RANGE = None | |
END_TIME_RANGE = None | |
class Log: | |
"""A class to maintain state about individual log files""" | |
_prefix = '' | |
_fd = None | |
def __init__(self, fd, prefix): | |
self._fd = fd | |
if prefix == 'full': | |
self._prefix = fd.name + ': ' | |
elif prefix == 'base': | |
self._prefix = os.path.basename(fd.name) + ': ' | |
def __iter__(self): | |
"""A generator to return log lines prefixed with a standardized, | |
sortable timestamp for parsing by heapq.merge() as well as optionally | |
prefixed with the log filename | |
""" | |
output = '' | |
skip = False | |
for line in self._fd: | |
match = TIMESTAMP_RE.search(line) | |
if match: | |
timestamp = SIMPLETS_RE.sub('', match.group(1)) | |
if output != '' and not skip: | |
yield output | |
skip = False | |
if timestamp < START_TIME_RANGE: | |
skip = True | |
output = '' | |
elif timestamp > END_TIME_RANGE: | |
return | |
else: | |
output = timestamp + '|' + self._prefix + line | |
else: | |
output += line | |
if output != '' and not skip: | |
yield output | |
def process_logs(logs, prefix): | |
"""Basic function for initiating the Log objects""" | |
processed_logs = [] | |
for log in logs: | |
processed_logs.append(Log(log, prefix)) | |
return processed_logs | |
def parse_timestamp(line): | |
"""Simple key-function for use by heapq.merge()""" | |
return line.split('|', 0)[0] | |
def sort_logs(logs, output): | |
"""Sort the given list of Log objects, writing output to given file object""" | |
for line in heapq.merge(*logs, key=parse_timestamp): | |
try: | |
print(line.split('|', 1)[1], file=output, end='') | |
except IndexError: | |
# no timestamp found prefixed | |
pass | |
except IOError as err: | |
# gracefully handle pipes closing | |
if err.errno == errno.EPIPE: | |
pass | |
def find_logs(directory): | |
"""Recursively search the given directory for log files""" | |
logs = [] | |
for log in Path(directory).glob('**/*.log'): | |
logs.append(open(log, 'r')) | |
return logs | |
def parse_args(args): | |
"""Handle parsing arguments""" | |
epoch = '19691231190000.000' | |
now = datetime.now().strftime('%Y%m%d%H%M%S.%f')[:-3] | |
parser = argparse.ArgumentParser(description='Merge multiple log files, ' | |
'sorted by timestamp. Optionally prefix and/or ' | |
'filter by time range.') | |
parser.add_argument('-o', '--output', type=argparse.FileType('w'), | |
default=sys.stdout, | |
help='where the merged file should be output ' | |
'(default: stdout)') | |
parser.add_argument('--no-prefix', action='store_true', | |
help='do not prefix log lines with filename') | |
parser.add_argument('--full-prefix', action='store_true', | |
help='include full path of filename as log line prefix') | |
parser.add_argument('-s', '--start-time', default=epoch, | |
help='filter logs by timestamp, starting at given time') | |
parser.add_argument('-e', '--end-time', default=now, | |
help='filter logs by timestamp, ending at given time') | |
files = parser.add_argument_group(title='specifying log files', | |
description='use at least one, or potentially ' | |
'both, of the below methods to specify log ' | |
'files to merge and sort') | |
files.add_argument('-d', '--directory', type=str, default=None, nargs='*', | |
help='one or more (space-delimited) directories to ' | |
'recursively search for all .log files') | |
files.add_argument('logs', nargs='*', type=argparse.FileType('r'), | |
help='individual files, useful for shell globbing - ' | |
'specify these before any -d/--directory') | |
parsed_args = parser.parse_args(args) | |
if parsed_args.directory is None and not parsed_args.logs: | |
parser.print_usage(file=sys.stderr) | |
print('error: at least one of logs or -d/--directory is required.', | |
file=sys.stderr) | |
sys.exit(1) | |
parsed_args.prefix = 'base' | |
if parsed_args.no_prefix: | |
parsed_args.prefix = '' | |
elif parsed_args.full_prefix: | |
parsed_args.prefix = 'full' | |
# standardize start and end times | |
if parsed_args.start_time != epoch: | |
match = TIMESTAMP_RE.search(parsed_args.start_time) | |
if match: | |
parsed_args.start_time = SIMPLETS_RE.sub('', match.group(1)) | |
else: | |
parser.print_usage(file=sys.stderr) | |
print('error: start time "{parsed_args.start_time}" not in recognized' | |
'format such as ISO8601', file=sys.stderr) | |
sys.exit(1) | |
if parsed_args.end_time != now: | |
match = TIMESTAMP_RE.search(parsed_args.end_time) | |
if match: | |
parsed_args.end_time = SIMPLETS_RE.sub('', match.group(1)) | |
else: | |
parser.print_usage(file=sys.stderr) | |
print('error: end time "{parsed_args.end_time}" not in recognized' | |
'format such as ISO8601', file=sys.stderr) | |
sys.exit(1) | |
# search any given directories for logs | |
if parsed_args.directory: | |
for directory in parsed_args.directory: | |
if not os.path.isdir(directory): | |
parser.print_usage(file=sys.stderr) | |
print(f'error: searching -d/--directory: "{directory}" ' | |
'is not a directory', file=sys.stderr) | |
sys.exit(1) | |
try: | |
parsed_args.logs += find_logs(directory) | |
except PermissionError as err: | |
parser.print_usage(file=sys.stderr) | |
print(f'error: searching -d/--directory: {err}', file=sys.stderr) | |
sys.exit(1) | |
return parsed_args | |
def main(): | |
"""Main program flow""" | |
global START_TIME_RANGE, END_TIME_RANGE | |
args = parse_args(sys.argv[1:]) | |
START_TIME_RANGE = args.start_time | |
END_TIME_RANGE = args.end_time | |
logs = process_logs(args.logs, args.prefix) | |
sort_logs(logs, args.output) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment