|
#!/usr/bin/env python |
|
|
|
import os |
|
import json |
|
import sqlite3 |
|
import argparse |
|
from collections import namedtuple, OrderedDict |
|
from operator import attrgetter |
|
from StringIO import StringIO |
|
|
|
import requests |
|
|
|
STANDARD_FIELDS = ( |
|
("@timestamp", "date"), |
|
("servicetype", "string"), |
|
("logname", "string"), |
|
("formatversion", "string"), |
|
("type", "string"), |
|
("useragent", "string"), |
|
("host", "string"), |
|
("sequencenumber", "long"), |
|
("ot-requestid", "string"), |
|
("ot-anonymousid", "string"), |
|
("ot-userid", "string"), |
|
("ot-sessionid", "string"), |
|
("ot-referringhost", "string"), |
|
("ot-referringservice", "string"), |
|
("ot-domain", "string"), |
|
("accept-language", "string"), |
|
("severity", "string"), |
|
("logmessage", "string"), |
|
("method", "string"), |
|
("url", "string"), |
|
("status", "long"), |
|
("duration", "long"), |
|
("bodysize", "long"), |
|
("responsesize", "long"), |
|
) |
|
|
|
TypeConfig = namedtuple("TypeConfig", |
|
["index_name", "type_name", "property_name", "property_type"]) |
|
|
|
def group_by(fn, l): |
|
acc = {} |
|
for e in l: |
|
key = fn(e) |
|
acc.setdefault(key, []).append(e) |
|
return acc |
|
|
|
def freq_by(fn, l): |
|
groups = group_by(fn, l) |
|
return { k: len(v) for k, v in group_by(fn, l) } |
|
|
|
def dict_factory(cursor, row): |
|
return |
|
|
|
def parse_indices(index_config): |
|
return [ |
|
TypeConfig(index, itype, tproperty, pconfigs["type"]) |
|
for index, types in index_config.iteritems() |
|
for itype, tconfig in types.get("mappings", {}).iteritems() |
|
for tproperty, pconfigs in tconfig.get("properties", {}).iteritems() |
|
if "type" in pconfigs |
|
] |
|
|
|
def init_standard_fields(cursor, rows): |
|
create_table = """ |
|
CREATE TABLE IF NOT EXISTS standard_fields ( |
|
property_name TEXT, |
|
property_type TEXT |
|
) |
|
""" |
|
|
|
insert_sql = """ |
|
INSERT INTO standard_fields (property_name, property_type) VALUES (?, ?) |
|
""" |
|
|
|
cursor.execute(create_table) |
|
cursor.executemany(insert_sql, rows) |
|
|
|
def init_fields(cursor, rows): |
|
field_sql = ", ".join("{} TEXT".format(f) for f in TypeConfig._fields) |
|
create_table = "CREATE TABLE IF NOT EXISTS fields ({})".format(field_sql) |
|
|
|
insert_sql = "INSERT INTO fields ({}) VALUES ({})".format( |
|
",".join(TypeConfig._fields), |
|
",".join(["?"] * len(TypeConfig._fields))) |
|
cursor.execute(create_table) |
|
cursor.executemany(insert_sql, rows) |
|
|
|
def init_db(db, rows): |
|
cursor = db.cursor() |
|
init_standard_fields(cursor, STANDARD_FIELDS) |
|
init_fields(cursor, rows) |
|
return db.commit() |
|
|
|
def load_file(path): |
|
if os.path.exists(path): |
|
return json.load(open(path)) |
|
else: |
|
return requests.get(path).json() |
|
|
|
def load_data(filename, db_file=":memory:"): |
|
db = sqlite3.connect(db_file) |
|
json_data = load_file(filename) |
|
field_data = parse_indices(json_data) |
|
init_db(db, field_data) |
|
|
|
return db |
|
|
|
def format_row(headers, row, fmts): |
|
out = [] |
|
for col, fmt in zip([row[k] for k in headers], fmts): |
|
out.append(fmt % col) |
|
return " | ".join(out) |
|
|
|
def table(rows, headers=None): |
|
if headers is None: |
|
headers = rows[0].keys() |
|
|
|
widths = [max(len(str(k)), *[len(str(r[k])) for r in rows]) for k in headers] |
|
fmts = [''.join(["%-", str(w), "s"]) for w in widths] |
|
header = format_row(headers, dict(zip(headers, headers)), fmts) |
|
bar = "-" * len(header) |
|
|
|
buffer = StringIO() |
|
buffer.write(bar + "\n") |
|
buffer.write(header + "\n") |
|
buffer.write(bar + "\n") |
|
for row in rows: |
|
buffer.write(format_row(headers, row, fmts) + "\n") |
|
buffer.write(bar) |
|
|
|
return buffer.getvalue() |
|
|
|
def main(): |
|
parser = argparse.ArgumentParser( |
|
description="elasticsearch mapping collision") |
|
parser.add_argument("--db", |
|
help="if set, file path is a db", |
|
action="store_true", |
|
default=False) |
|
parser.add_argument("-o", "--output-db", |
|
help="output database file", |
|
default=":memory:") |
|
parser.add_argument("file", |
|
help="path to mapping file") |
|
parser.add_argument("-q", "--query", |
|
help="query for mapping db") |
|
|
|
args = parser.parse_args() |
|
|
|
if args.db: |
|
db = sqlite3.connect(args.file) |
|
else: |
|
if os.path.exists(args.output_db): |
|
raise RuntimeError("output db already exists") |
|
db = load_data(args.file, args.output_db) |
|
|
|
if args.query: |
|
db.row_factory = lambda c, r: OrderedDict(zip([e[0] for e in c.description], r)) |
|
cursor = db.cursor() |
|
|
|
print table(cursor.execute(args.query).fetchall()) |
|
|
|
return db |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |