Skip to content

Instantly share code, notes, and snippets.

@SegFaultAX
Last active August 29, 2015 14:24
Show Gist options
  • Save SegFaultAX/9782d20604e86d1194dc to your computer and use it in GitHub Desktop.
Save SegFaultAX/9782d20604e86d1194dc to your computer and use it in GitHub Desktop.
Elasticsearch mapping analyzer
#!/usr/bin/env python
import os
import json
import sqlite3
import argparse
from collections import namedtuple, OrderedDict
from operator import attrgetter
from StringIO import StringIO
import requests
STANDARD_FIELDS = (
("@timestamp", "date"),
("servicetype", "string"),
("logname", "string"),
("formatversion", "string"),
("type", "string"),
("useragent", "string"),
("host", "string"),
("sequencenumber", "long"),
("ot-requestid", "string"),
("ot-anonymousid", "string"),
("ot-userid", "string"),
("ot-sessionid", "string"),
("ot-referringhost", "string"),
("ot-referringservice", "string"),
("ot-domain", "string"),
("accept-language", "string"),
("severity", "string"),
("logmessage", "string"),
("method", "string"),
("url", "string"),
("status", "long"),
("duration", "long"),
("bodysize", "long"),
("responsesize", "long"),
)
TypeConfig = namedtuple("TypeConfig",
["index_name", "type_name", "property_name", "property_type"])
def group_by(fn, l):
acc = {}
for e in l:
key = fn(e)
acc.setdefault(key, []).append(e)
return acc
def freq_by(fn, l):
groups = group_by(fn, l)
return { k: len(v) for k, v in group_by(fn, l) }
def dict_factory(cursor, row):
return
def parse_indices(index_config):
return [
TypeConfig(index, itype, tproperty, pconfigs["type"])
for index, types in index_config.iteritems()
for itype, tconfig in types.get("mappings", {}).iteritems()
for tproperty, pconfigs in tconfig.get("properties", {}).iteritems()
if "type" in pconfigs
]
def init_standard_fields(cursor, rows):
create_table = """
CREATE TABLE IF NOT EXISTS standard_fields (
property_name TEXT,
property_type TEXT
)
"""
insert_sql = """
INSERT INTO standard_fields (property_name, property_type) VALUES (?, ?)
"""
cursor.execute(create_table)
cursor.executemany(insert_sql, rows)
def init_fields(cursor, rows):
field_sql = ", ".join("{} TEXT".format(f) for f in TypeConfig._fields)
create_table = "CREATE TABLE IF NOT EXISTS fields ({})".format(field_sql)
insert_sql = "INSERT INTO fields ({}) VALUES ({})".format(
",".join(TypeConfig._fields),
",".join(["?"] * len(TypeConfig._fields)))
cursor.execute(create_table)
cursor.executemany(insert_sql, rows)
def init_db(db, rows):
cursor = db.cursor()
init_standard_fields(cursor, STANDARD_FIELDS)
init_fields(cursor, rows)
return db.commit()
def load_file(path):
if os.path.exists(path):
return json.load(open(path))
else:
return requests.get(path).json()
def load_data(filename, db_file=":memory:"):
db = sqlite3.connect(db_file)
json_data = load_file(filename)
field_data = parse_indices(json_data)
init_db(db, field_data)
return db
def format_row(headers, row, fmts):
out = []
for col, fmt in zip([row[k] for k in headers], fmts):
out.append(fmt % col)
return " | ".join(out)
def table(rows, headers=None):
if headers is None:
headers = rows[0].keys()
widths = [max(len(str(k)), *[len(str(r[k])) for r in rows]) for k in headers]
fmts = [''.join(["%-", str(w), "s"]) for w in widths]
header = format_row(headers, dict(zip(headers, headers)), fmts)
bar = "-" * len(header)
buffer = StringIO()
buffer.write(bar + "\n")
buffer.write(header + "\n")
buffer.write(bar + "\n")
for row in rows:
buffer.write(format_row(headers, row, fmts) + "\n")
buffer.write(bar)
return buffer.getvalue()
def main():
parser = argparse.ArgumentParser(
description="elasticsearch mapping collision")
parser.add_argument("--db",
help="if set, file path is a db",
action="store_true",
default=False)
parser.add_argument("-o", "--output-db",
help="output database file",
default=":memory:")
parser.add_argument("file",
help="path to mapping file")
parser.add_argument("-q", "--query",
help="query for mapping db")
args = parser.parse_args()
if args.db:
db = sqlite3.connect(args.file)
else:
if os.path.exists(args.output_db):
raise RuntimeError("output db already exists")
db = load_data(args.file, args.output_db)
if args.query:
db.row_factory = lambda c, r: OrderedDict(zip([e[0] for e in c.description], r))
cursor = db.cursor()
print table(cursor.execute(args.query).fetchall())
return db
if __name__ == "__main__":
main()

Usage:

➭ python find_collisions.py --help
usage: find_collisions.py [-h] [--db] [-o OUTPUT_DB] [-q QUERY] file

elasticsearch mapping collision

positional arguments:
  file                  path to mapping file

optional arguments:
  -h, --help            show this help message and exit
  --db                  if set, file path is a db
  -o OUTPUT_DB, --output-db OUTPUT_DB
                        output database file
  -q QUERY, --query QUERY
                        query for mapping db
python find_collisions.py http://es-logging-usw2.otenv.com:9200/_mapping --query "select * from fields limit 1" -o prod-mapping-20150706.db
SELECT DISTINCT
  f.index_name,
  f.type_name,
  f.property_name,
  f.property_type,
  sf.property_name AS official_name,
  sf.property_type AS official_type
FROM fields f
INNER JOIN standard_fields sf
  ON sf.property_name = LOWER(f.property_name)
  AND (sf.property_name != f.property_name
  OR sf.property_type != f.property_type)
WHERE f.index_name = 'logstash-2015.07.09'
ORDER BY f.index_name, f.type_name, f.property_name;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment