Skip to content

Instantly share code, notes, and snippets.

@SegFaultAX
Last active February 15, 2018 01:11
Show Gist options
  • Save SegFaultAX/6d58181f58210309533a73d3649d6473 to your computer and use it in GitHub Desktop.
Save SegFaultAX/6d58181f58210309533a73d3649d6473 to your computer and use it in GitHub Desktop.
Quick and dirty slow log parser for elasticsearch (sqlite3 output)
#!/usr/bin/env python
# The MIT License (MIT)
# Copyright (c) 2016 Michael-Keith Bernard
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import re
import json
import sqlite3
import argparse
import attr
import parsec as p
__author__ = "Michael-Keith Bernard ([email protected])"
@attr.s(frozen=True)
class Field(object):
label = attr.ib()
content = attr.ib()
whitespace = p.regex(r"[\s,]*", re.I)
lexeme = lambda p: p << whitespace
any_content = p.regex(r"[^\[\]]*", re.I)
label = p.regex(r"[\w-]+", re.I)
lbrack = lexeme(p.string("["))
rbrack = lexeme(p.string("]"))
def bracketed(ps, label=None):
@lexeme
@p.generate
def parser():
yield lbrack
content = yield ps
yield rbrack
return Field(label, content)
return parser
def labeled(ps):
@lexeme
@p.generate
def parser():
label_name = yield label
content = yield ps
return Field(label_name, content)
return parser
simple_label = labeled(bracketed(any_content).parsecmap(lambda e: e.content))
simple_bracket = lambda l: bracketed(any_content, l)
timestamp = simple_bracket("timestamp")
log_level = simple_bracket("log_level")
log_name = simple_bracket("log_name")
hostname = simple_bracket("hostname")
index_name = simple_bracket("index_name")
shard = simple_bracket("shard")
@p.generate
def source():
yield p.string("source")
yield lbrack
content = yield p.regex(r".*")
content = content.strip()
if content.endswith(","):
content = content[:-1]
if content.endswith("]"):
content = content[:-1]
return Field("source", json.loads(content))
def logify(*args):
return { f.label: f.content for f in args }
@p.generate
def log():
timestamp_field = yield timestamp
log_level_field = yield log_level
log_name_field = yield log_name
hostname_field = yield hostname
index_name_field = yield index_name
shard_field = yield shard
took_field = yield simple_label
took_milli_field = yield simple_label
types_field = yield simple_label
stats_field = yield simple_label
search_type_field = yield simple_label
total_shards_field = yield simple_label
source_field = yield source
return logify(
timestamp_field,
log_level_field,
log_name_field,
hostname_field,
index_name_field,
shard_field,
took_field,
took_milli_field,
types_field,
stats_field,
search_type_field,
total_shards_field,
source_field)
log_line = whitespace >> log
def migrate(conn):
cur = conn.cursor()
sql = """
create table if not exists logs (
id integer primary key autoincrement,
hostname text not null,
index_name text not null,
took_millis integer not null,
source text not null
)
"""
cur.execute(sql)
conn.commit()
def dump_json(obj):
return json.dumps(obj, sort_keys=True, separators=(',',':'))
def drop_timestamp(source):
if isinstance(source, dict):
return { k: drop_timestamp(v) for k, v in source.items() if k != "@timestamp" }
elif isinstance(source, list):
return [drop_timestamp(e) for e in source]
else:
return source
def insert_row(cursor, log):
sql = """
insert into logs(hostname, index_name, took_millis, source)
values (?, ?, ?, ?)
"""
hn = log["hostname"]
ix = log["index_name"]
tm = int(log["took_millis"])
sc = dump_json(drop_timestamp(log["source"]))
cursor.execute(sql, (hn, ix, tm, sc))
def parse_args(args=None, parse=True):
parser = argparse.ArgumentParser(
description="Elasticsearch Slow Query Log Parser")
parser.add_argument("filename",
help="Path to log file")
parser.add_argument("-d", "--database",
default=":memory:",
help="Path to sqlite3 database")
res = parser.parse_args(args) if parse else None
return parser, res
def main():
_parser, args = parse_args()
conn = sqlite3.connect(args.database)
conn.row_factory = sqlite3.Row
migrate(conn)
cursor = conn.cursor()
for line in open(args.filename):
insert_row(cursor, log_line.parse(line))
conn.commit()
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment