Skip to content

Instantly share code, notes, and snippets.

@SegFaultAX
Created January 23, 2017 23:24
Show Gist options
  • Save SegFaultAX/05e0f76a8dd5dd5d28964585f2b14049 to your computer and use it in GitHub Desktop.
Save SegFaultAX/05e0f76a8dd5dd5d28964585f2b14049 to your computer and use it in GitHub Desktop.
Parse nginx logs to sqlite3
#!/usr/bin/env python
# The MIT License (MIT)
# Copyright (c) 2016 Michael-Keith Bernard
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import re
import copy
import logging
import argparse
import sqlite3
import datetime
import calendar
import itertools
__author__ = "Michael-Keith Bernard ([email protected])"
def with_match(name, re):
return r'(?P<{}>{})'.format(name, re)
IP_RE = r"\d+(?:\.\d+){3}"
QUOTED_STRING_RE = r'"[^\"]+"'
TIMESTAMP_RE = r'\[[^\]]+\]'
IP_LIST_RE = r'(?:{ip}(?:\,\s+{ip})*)|-'.format(ip=IP_RE)
NUMERAL_RE = r'\d+(?:\.\d+)?'
LOG_PARTS = [
with_match('remote_addr', IP_RE),
with_match('user', r'- -'),
with_match('timestamp', TIMESTAMP_RE),
with_match('request', QUOTED_STRING_RE),
with_match('response_code', r'\d+'),
with_match('response_size', r'\d+'),
with_match('referer', QUOTED_STRING_RE),
with_match('user_agent', QUOTED_STRING_RE),
with_match('forwarded_for', IP_LIST_RE),
with_match('nginx_time', NUMERAL_RE),
with_match('upstream_time', NUMERAL_RE),
with_match('pipelined', r'[.p]'),
]
LOG_RE = r"^{}$".format("\s+".join(LOG_PARTS))
CREATE_REQUESTS_TABLE = """
create table if not exists requests (
id integer primary key,
source text,
upstream_time float,
nginx_time float,
response_size integer,
response_code integer,
user_agent text,
referer text,
remote_addr text,
forwarded_for text,
pipelined boolean,
timestamp text,
unix_epoch float,
method text,
path text,
query_string text,
protocol text
);
"""
def parse_args(args=None, parse=True):
"""Parse command-line arguments"""
parser = argparse.ArgumentParser(
description="Log processor for Graphite Web nginx")
parser.add_argument("logfile",
help="Path to nginx log file")
parser.add_argument("-d" ,"--db",
default=":memory:",
help="Path to sqlite3 database")
parser.add_argument("-s" ,"--source",
default="graphite",
help="Source of logs (eg which web server)")
parser.add_argument("-b" ,"--batch",
default=500,
type=int,
help="Batch size for inserts")
res = parser.parse_args(args) if parse else None
return parser, res
def migrate_db(db):
"""Run database migrations (create tables, etc)"""
cur = db.cursor()
cur.execute(CREATE_REQUESTS_TABLE)
db.commit()
def setup_db(path, migrations=True):
"""Initialize database connection"""
db = sqlite3.connect(path)
db.row_factory = sqlite3.Row
if migrations:
migrate_db(db)
return db
def parse_log(log_line):
"""Parse a single log line"""
match = re.match(LOG_RE, log_line.strip())
return match.groupdict() if match else None
def parse_date(timestamp):
"""Parse the nginx time format into datetime"""
fmt = '[%d/%b/%Y:%H:%M:%S +0000]'
dt = datetime.datetime.strptime(timestamp, fmt)
return dt, calendar.timegm(dt.timetuple())
def parse_request(request):
"""Parse the request into method, path, query string, and HTTP protocol"""
req = request[1:-1]
method, rest = req.split(" ", 1)
full_path, protocol = rest.rsplit(" ", 1)
parts = full_path.split("?", 1)
path, qs = parts if len(parts) > 1 else (parts[0], "")
return method, path, qs, protocol
def normalize_log(parsed):
"""Clean up a parsed log data"""
n = {}
ts, epoch = parse_date(parsed['timestamp'])
method, path, qs, protocol = parse_request(parsed['request'])
n['upstream_time'] = float(parsed['upstream_time'])
n['nginx_time'] = float(parsed['nginx_time'])
n['response_size'] = int(parsed['response_size'])
n['response_code'] = int(parsed['response_code'])
n['user_agent'] = parsed['user_agent'][1:-1]
n['referer'] = parsed['referer'][1:-1]
n['remote_addr'] = parsed['remote_addr']
n['forwarded_for'] = parsed['forwarded_for'].split(", ")
n['pipelined'] = parsed['pipelined'] == 'p'
n['timestamp'] = ts
n['unix_epoch'] = epoch
n['method'] = method
n['path'] = path
n['query_string'] = qs
n['protocol'] = protocol
return n
def prepare_log(log, **kwargs):
"""Prepare a normalized log for database insertion"""
p = copy.deepcopy(log)
p['timestamp'] = p['timestamp'].isoformat()
p['forwarded_for'] = ",".join(p['forwarded_for'])
p.update(kwargs)
return p
def insert_log(cur, log):
"""Insert a prepared log line into the database"""
items = log.items()
keys = [e[0] for e in items]
values = [e[1] for e in items]
sql = """insert into requests ({}) values ({})""".format(
", ".join(keys),
", ".join(["?"] * len(keys)))
cur.execute(sql, values)
def load_db(db, logs, **kwargs):
"""Load logs into database"""
cur = db.cursor()
for (_raw, _parsed, normalized) in logs:
prepared = prepare_log(normalized, **kwargs)
insert_log(cur, prepared)
db.commit()
def batches(l, length=500):
it = iter(l)
while True:
b = list(itertools.islice(it, length))
if b:
yield b
else:
break
def main():
_parser, args = parse_args()
db = setup_db(args.db)
def process(f):
for line in open(f):
log = line.strip()
parsed = parse_log(log)
if not parsed:
logging.debug("Invalid log: %s", log)
continue
normalized = normalize_log(parsed)
yield log, parsed, normalized
for batch in batches(process(args.logfile), args.batch):
load_db(db, batch, source=args.source)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment