Skip to content

Instantly share code, notes, and snippets.

@lelandbatey
Last active October 15, 2021 23:59
Show Gist options
  • Save lelandbatey/64e7c7d3d86b4a1b455a93f593562d68 to your computer and use it in GitHub Desktop.
Save lelandbatey/64e7c7d3d86b4a1b455a93f593562d68 to your computer and use it in GitHub Desktop.
A tool for examining statistics about JSON object structures. A great way to examine documents in a MongoDB collection for consistency.
#! /usr/bin/env python3
# Downloaded from here: https://gist.github.com/lelandbatey/64e7c7d3d86b4a1b455a93f593562d68
from collections import Sequence
from operator import itemgetter
from codecs import escape_encode
import argparse
import json
import math
import sys
# This implementation for parsing stacked JSON is taken from the following
# Stackoverflow post:
# https://stackoverflow.com/a/50384432
from json import JSONDecoder, JSONDecodeError
import re
NOT_WHITESPACE = re.compile(r'[^\s]')
def decode_stacked(document, pos=0, decoder=JSONDecoder()):
while True:
match = NOT_WHITESPACE.search(document, pos)
if not match:
return
pos = match.start()
try:
obj, pos = decoder.raw_decode(document, pos)
except JSONDecodeError:
# do something sensible if there's some error
raise
yield obj
def get_json_type(x):
if isinstance(x, dict):
return 'dict'
if isinstance(x, bool):
return 'bool'
if isinstance(x, (int, float, complex)):
return 'number'
if isinstance(x, str):
return 'string'
if isinstance(x, Sequence):
return 'list'
if x is None:
return 'null'
return "other {}".format(str(type(x)))
###
# More complicated HTML color-coded view of a tree
###
def rgb_to_hex(rgb):
hex_color = '#'
for x in range(3):
hex_color += '{:<02X}'.format(rgb[x])
return hex_color
def get_color_on_gradient(colors, progress):
'''
:param colors: list of colors, where each color is a three-part iterable, (r, g, b), with each item being a value from 0 - 255 inclusive. Alternatively, each iterable may be a string formatted as '#000000'.
:param progress: floating point number from 0.0 to 1.0, inclusive
'''
def blend_channel(a, b, x):
return int(math.sqrt((1 - x) * (a**2) + x * (b**2)))
def hex_to_rgb(hex_num):
return (int(hex_num[1:3], 16), int(hex_num[3:5], 16), int(hex_num[5:7], 16))
def blend_colors(c1, c2, t):
if get_json_type(c1) == 'string':
c1 = hex_to_rgb(c1)
c2 = hex_to_rgb(c2)
new_color = [0, 0, 0]
for idx in range(3):
new_color[idx] = blend_channel(c1[idx], c2[idx], t)
return new_color
if progress == 1.0:
return hex_to_rgb(colors[-1])
n = 1 / (len(colors) - 1)
tween_color_span = progress / n % 1
c_start_idx = min(math.floor(progress / n), len(colors) - 2)
c_start_idx = math.floor(progress / n)
# print('\n', progress, n, tween_color_span, c_start_idx, colors)
c1 = colors[c_start_idx]
c2 = colors[c_start_idx + 1]
return blend_colors(c1, c2, tween_color_span)
def make_count_tree(path, node):
'''
path is Jpath.path
'''
kv = path[0]
if len(path) == 1:
leaf_record = node.setdefault(kv.key_name, dict())
count = leaf_record.setdefault(kv.val_type, 0)
leaf_record[kv.val_type] = count + 1
return leaf_record[kv.val_type]
# leaf_record['is_term'] = True
else:
branch_record = node.setdefault(kv.key_name, dict())
return make_count_tree(path[1:], branch_record)
class Jpath:
def __init__(self):
self.path = list()
self.terminal_type = ''
def __str__(self):
return repr(self)
def __repr__(self):
return '[{}] "{}"'.format(', '.join([str(x) for x in self.path]), self.terminal_type)
def __hash__(self):
return hash(repr(self))
def __lt__(self, other):
return repr(self) < repr(other)
def __eq__(self, other):
return repr(self) == repr(other)
class KeyVal:
def __init__(self):
self.key_name = ''
self.val_type = ''
def __str__(self):
return repr(self)
def __repr__(self):
return "({}, '{}')".format(repr(self.key_name), self.val_type)
def desc_list_jpath(l):
paths = set()
for v in l:
typ = get_json_type(v)
kv = KeyVal()
kv.key_name = 0
kv.val_type = typ
if typ == 'dict':
subpaths = desc_dict_jpath(v)
elif typ == 'list':
subpaths = desc_list_jpath(v)
else:
njp = Jpath()
njp.terminal_type = typ
subpaths = [njp]
for sp in subpaths:
njp = Jpath()
njp.path = [kv] + sp.path
njp.terminal_type = sp.terminal_type
paths.add(njp)
return sorted(list(paths))
def desc_dict_jpath(d):
paths = list()
for k, v in d.items():
typ = get_json_type(v)
kv = KeyVal()
kv.key_name = k
kv.val_type = typ
if typ == 'dict':
subpaths = desc_dict_jpath(v)
elif typ == 'list':
subpaths = desc_list_jpath(v)
else:
njp = Jpath()
njp.terminal_type = typ
subpaths = [njp]
for sp in subpaths:
njp = Jpath()
njp.path = [kv] + sp.path
njp.terminal_type = sp.terminal_type
paths.append(njp)
return sorted(paths)
def jtype_repr(typ):
mp = {'dict': '{}', 'bool': 'false', 'number': '0', 'string': '""', 'list': '[]', 'null': 'null'}
return mp[typ]
def is_terminal_type(typ):
return not (typ == 'dict' or typ == 'list')
TERMINAL_TYPES = set(['bool', 'number', 'string', 'null'])
def dict_represents_terminal_type(d):
keys = set(d.keys())
return (keys - TERMINAL_TYPES) == set()
def print_count_tree(node, most, colors, indent=0, results_file=None):
if results_file is None:
results_file = sys.stdout
def get_types(v):
return list(set(v.keys()) - set(['is_term']))
inds = ' ' * indent
brace_indent = ' ' * indent
key_indent = ' ' * (indent + 1)
type_considering = 'dict'
if 0 in node:
# node = node[0]
type_considering = 'list'
# We need to know which key will be the last key so that when we're
# printing it, we can omit the adding of the comma on the closing curly
# brace.
items = list(node.items())
if type_considering == 'dict':
print('{')
elif type_considering == 'list':
print('[')
for idx, (k, v) in enumerate(items):
if type_considering == 'list':
childvals = list(v.items())
for child_idx, (ck, cv) in enumerate(childvals):
child_type = get_json_type(cv)
if is_terminal_type(child_type):
percent = 1 - (cv / most)
cur_color = rgb_to_hex(get_color_on_gradient(colors, percent))
spn = '<span style="background-color: {}">'.format(cur_color)
end_spn = '</span>'
print('{}{}{}{}'.format(spn, key_indent, jtype_repr(ck), end_spn), file=results_file)
else:
if ck == 0:
print(key_indent, end='', file=results_file)
print_count_tree({0: cv}, most, colors, indent + 1, results_file)
else:
print(key_indent, end='', file=results_file)
print_count_tree({ck: cv}, most, colors, indent + 1, results_file)
if child_idx + 1 < len(childvals):
print(',', end='', file=results_file)
print(file=results_file)
else:
typ = get_json_type(v)
if typ in TERMINAL_TYPES and k in TERMINAL_TYPES:
continue
elif dict_represents_terminal_type(v):
typ = list(v.keys())[0]
count = v[typ]
percent = 1 - (count / most)
cur_color = rgb_to_hex(get_color_on_gradient(colors, percent))
spn = '<span style="background-color: {}">'.format(cur_color)
print('{}{}"{}": '.format(spn, key_indent, k), end='', file=results_file)
print("{}</span>".format(jtype_repr(typ)), end='', file=results_file)
else:
print('{}"{}": '.format(key_indent, k), end='', file=results_file)
print_count_tree(v, most, colors, indent + 1, results_file)
if idx + 1 < len(items):
print(',', file=results_file)
else:
print(file=results_file)
if type_considering == 'dict':
print('{}}}'.format(brace_indent), end='', file=results_file)
elif type_considering == 'list':
print('{}]'.format(brace_indent), end='', file=results_file)
###
# Implementing the very basic "frequency path" raw text output
###
def describe_list(l, parent_name=None):
keys = set()
for item in l:
typ = get_json_type(item)
if typ == 'list':
subkeys = describe_list(item)
for key in subkeys:
keys.add(f'["0"]{key}')
elif typ == 'dict':
subkeys = describe_keys(item)
for key in subkeys:
keys.add(f'["0"].{key}')
else:
keys.add(f'["0"] ({get_json_type(item)})')
return sorted(list(keys))
def describe_keys(d, parent_key=None):
keys = list()
for k, v in d.items():
if isinstance(v, dict):
subkeys = describe_keys(v, k)
for key in subkeys:
keys.append(f'{k}.{key}')
elif get_json_type(v) == 'list':
subkeys = describe_list(v, k)
for key in subkeys:
keys.append(f'{k}.{key}')
else:
keys.append(f"{k} ({get_json_type(v)})")
return keys
def find_common_keys(docs):
'''
Returns a dictionary of string->int, where keys will contain the union of
all keys in all dictionaries in 'docs', and the value for each key will be
the number of times a doc was found with that key in it.
:params docs: a list of dictionaries
'''
rv = dict()
for doc in docs:
for key in describe_keys(doc):
cur = rv.setdefault(key, 0)
rv[key] = cur + 1
return rv
def main():
parser = argparse.ArgumentParser(
"Shows stats info on JSON object schemas. "
"Great for reverse engineering the structure of a poorly maintained Mongo database."
)
parser.add_argument('input', help="Path to read from. Default is stdin ('-')", default='-', nargs='?')
parser.add_argument(
'--html-pretty-print',
help="Prints results as a color-coded JSON schema-style object",
default=argparse.SUPPRESS,
action='store_true'
)
args = parser.parse_args()
filepath = args.input
infile = sys.stdin
if not filepath == '-':
infile = open(filepath, 'r')
# We don't handle streaming JSON, as that'd be way too difficult
data = infile.read()
docs = [x for x in decode_stacked(data)]
# Handle the case where the series of json objects is correctly nested
# within a list
if len(docs) == 1 and get_json_type(docs) == 'list':
docs = [x for x in docs[0]]
if 'html_pretty_print' in args:
jtrees = list()
for doc in docs:
jtrees += desc_dict_jpath(doc)
most = 0
root = dict()
for j in jtrees:
mx = make_count_tree(j.path, root)
most = max(most, mx)
colors = ["#00429d", "#3aa794", "#dfdfc1", "#ff005e", "#93003a"]
print('<!DOCTYPE html><html>')
print('<body>')
# Create a nice little table showing the order of the color gradient
print('<table style="width: 800px; table-layout: fixed;"><tr>')
for idx, _ in enumerate(colors):
if idx == 0:
print('<th style="text-align: left;">Used in 100% of Objects</th>')
elif idx == len(colors) - 1:
print('<th style="text-align: right;">Used in 0% of Objects</th>')
else:
print('<th></th>')
print('</tr><tr>')
for c in colors:
print('<td style="background-color: {}; height: 50px;"</td>'.format(c))
print('</tr></table>')
print('<pre style="background-color: grey; color: white;">')
print_count_tree(root, most, colors)
print('</pre><body></html>')
else:
key_frequency = find_common_keys(docs)
vkf = [(k, count) for k, count in key_frequency.items()]
for k, count in sorted(sorted(vkf, key=itemgetter(0)), key=itemgetter(1)):
print(f"{count:<7} {k}")
if __name__ == '__main__': main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment