Created
January 23, 2017 23:24
-
-
Save SegFaultAX/05e0f76a8dd5dd5d28964585f2b14049 to your computer and use it in GitHub Desktop.
Parse nginx logs to sqlite3
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# The MIT License (MIT) | |
# Copyright (c) 2016 Michael-Keith Bernard | |
# | |
# Permission is hereby granted, free of charge, to any person obtaining a copy | |
# of this software and associated documentation files (the "Software"), to deal | |
# in the Software without restriction, including without limitation the rights | |
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
# copies of the Software, and to permit persons to whom the Software is | |
# furnished to do so, subject to the following conditions: | |
# | |
# The above copyright notice and this permission notice shall be included in all | |
# copies or substantial portions of the Software. | |
# | |
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | |
# SOFTWARE. | |
import re | |
import copy | |
import logging | |
import argparse | |
import sqlite3 | |
import datetime | |
import calendar | |
import itertools | |
__author__ = "Michael-Keith Bernard ([email protected])" | |
def with_match(name, re): | |
return r'(?P<{}>{})'.format(name, re) | |
IP_RE = r"\d+(?:\.\d+){3}" | |
QUOTED_STRING_RE = r'"[^\"]+"' | |
TIMESTAMP_RE = r'\[[^\]]+\]' | |
IP_LIST_RE = r'(?:{ip}(?:\,\s+{ip})*)|-'.format(ip=IP_RE) | |
NUMERAL_RE = r'\d+(?:\.\d+)?' | |
LOG_PARTS = [ | |
with_match('remote_addr', IP_RE), | |
with_match('user', r'- -'), | |
with_match('timestamp', TIMESTAMP_RE), | |
with_match('request', QUOTED_STRING_RE), | |
with_match('response_code', r'\d+'), | |
with_match('response_size', r'\d+'), | |
with_match('referer', QUOTED_STRING_RE), | |
with_match('user_agent', QUOTED_STRING_RE), | |
with_match('forwarded_for', IP_LIST_RE), | |
with_match('nginx_time', NUMERAL_RE), | |
with_match('upstream_time', NUMERAL_RE), | |
with_match('pipelined', r'[.p]'), | |
] | |
LOG_RE = r"^{}$".format("\s+".join(LOG_PARTS)) | |
CREATE_REQUESTS_TABLE = """ | |
create table if not exists requests ( | |
id integer primary key, | |
source text, | |
upstream_time float, | |
nginx_time float, | |
response_size integer, | |
response_code integer, | |
user_agent text, | |
referer text, | |
remote_addr text, | |
forwarded_for text, | |
pipelined boolean, | |
timestamp text, | |
unix_epoch float, | |
method text, | |
path text, | |
query_string text, | |
protocol text | |
); | |
""" | |
def parse_args(args=None, parse=True): | |
"""Parse command-line arguments""" | |
parser = argparse.ArgumentParser( | |
description="Log processor for Graphite Web nginx") | |
parser.add_argument("logfile", | |
help="Path to nginx log file") | |
parser.add_argument("-d" ,"--db", | |
default=":memory:", | |
help="Path to sqlite3 database") | |
parser.add_argument("-s" ,"--source", | |
default="graphite", | |
help="Source of logs (eg which web server)") | |
parser.add_argument("-b" ,"--batch", | |
default=500, | |
type=int, | |
help="Batch size for inserts") | |
res = parser.parse_args(args) if parse else None | |
return parser, res | |
def migrate_db(db): | |
"""Run database migrations (create tables, etc)""" | |
cur = db.cursor() | |
cur.execute(CREATE_REQUESTS_TABLE) | |
db.commit() | |
def setup_db(path, migrations=True): | |
"""Initialize database connection""" | |
db = sqlite3.connect(path) | |
db.row_factory = sqlite3.Row | |
if migrations: | |
migrate_db(db) | |
return db | |
def parse_log(log_line): | |
"""Parse a single log line""" | |
match = re.match(LOG_RE, log_line.strip()) | |
return match.groupdict() if match else None | |
def parse_date(timestamp): | |
"""Parse the nginx time format into datetime""" | |
fmt = '[%d/%b/%Y:%H:%M:%S +0000]' | |
dt = datetime.datetime.strptime(timestamp, fmt) | |
return dt, calendar.timegm(dt.timetuple()) | |
def parse_request(request): | |
"""Parse the request into method, path, query string, and HTTP protocol""" | |
req = request[1:-1] | |
method, rest = req.split(" ", 1) | |
full_path, protocol = rest.rsplit(" ", 1) | |
parts = full_path.split("?", 1) | |
path, qs = parts if len(parts) > 1 else (parts[0], "") | |
return method, path, qs, protocol | |
def normalize_log(parsed): | |
"""Clean up a parsed log data""" | |
n = {} | |
ts, epoch = parse_date(parsed['timestamp']) | |
method, path, qs, protocol = parse_request(parsed['request']) | |
n['upstream_time'] = float(parsed['upstream_time']) | |
n['nginx_time'] = float(parsed['nginx_time']) | |
n['response_size'] = int(parsed['response_size']) | |
n['response_code'] = int(parsed['response_code']) | |
n['user_agent'] = parsed['user_agent'][1:-1] | |
n['referer'] = parsed['referer'][1:-1] | |
n['remote_addr'] = parsed['remote_addr'] | |
n['forwarded_for'] = parsed['forwarded_for'].split(", ") | |
n['pipelined'] = parsed['pipelined'] == 'p' | |
n['timestamp'] = ts | |
n['unix_epoch'] = epoch | |
n['method'] = method | |
n['path'] = path | |
n['query_string'] = qs | |
n['protocol'] = protocol | |
return n | |
def prepare_log(log, **kwargs): | |
"""Prepare a normalized log for database insertion""" | |
p = copy.deepcopy(log) | |
p['timestamp'] = p['timestamp'].isoformat() | |
p['forwarded_for'] = ",".join(p['forwarded_for']) | |
p.update(kwargs) | |
return p | |
def insert_log(cur, log): | |
"""Insert a prepared log line into the database""" | |
items = log.items() | |
keys = [e[0] for e in items] | |
values = [e[1] for e in items] | |
sql = """insert into requests ({}) values ({})""".format( | |
", ".join(keys), | |
", ".join(["?"] * len(keys))) | |
cur.execute(sql, values) | |
def load_db(db, logs, **kwargs): | |
"""Load logs into database""" | |
cur = db.cursor() | |
for (_raw, _parsed, normalized) in logs: | |
prepared = prepare_log(normalized, **kwargs) | |
insert_log(cur, prepared) | |
db.commit() | |
def batches(l, length=500): | |
it = iter(l) | |
while True: | |
b = list(itertools.islice(it, length)) | |
if b: | |
yield b | |
else: | |
break | |
def main(): | |
_parser, args = parse_args() | |
db = setup_db(args.db) | |
def process(f): | |
for line in open(f): | |
log = line.strip() | |
parsed = parse_log(log) | |
if not parsed: | |
logging.debug("Invalid log: %s", log) | |
continue | |
normalized = normalize_log(parsed) | |
yield log, parsed, normalized | |
for batch in batches(process(args.logfile), args.batch): | |
load_db(db, batch, source=args.source) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment