Last active
October 15, 2021 23:59
-
-
Save lelandbatey/64e7c7d3d86b4a1b455a93f593562d68 to your computer and use it in GitHub Desktop.
A tool for examining statistics about JSON object structures. A great way to examine documents in a MongoDB collection for consistency.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #! /usr/bin/env python3 | |
| # Downloaded from here: https://gist.github.com/lelandbatey/64e7c7d3d86b4a1b455a93f593562d68 | |
| from collections import Sequence | |
| from operator import itemgetter | |
| from codecs import escape_encode | |
| import argparse | |
| import json | |
| import math | |
| import sys | |
| # This implementation for parsing stacked JSON is taken from the following | |
| # Stackoverflow post: | |
| # https://stackoverflow.com/a/50384432 | |
| from json import JSONDecoder, JSONDecodeError | |
| import re | |
| NOT_WHITESPACE = re.compile(r'[^\s]') | |
| def decode_stacked(document, pos=0, decoder=JSONDecoder()): | |
| while True: | |
| match = NOT_WHITESPACE.search(document, pos) | |
| if not match: | |
| return | |
| pos = match.start() | |
| try: | |
| obj, pos = decoder.raw_decode(document, pos) | |
| except JSONDecodeError: | |
| # do something sensible if there's some error | |
| raise | |
| yield obj | |
| def get_json_type(x): | |
| if isinstance(x, dict): | |
| return 'dict' | |
| if isinstance(x, bool): | |
| return 'bool' | |
| if isinstance(x, (int, float, complex)): | |
| return 'number' | |
| if isinstance(x, str): | |
| return 'string' | |
| if isinstance(x, Sequence): | |
| return 'list' | |
| if x is None: | |
| return 'null' | |
| return "other {}".format(str(type(x))) | |
| ### | |
| # More complicated HTML color-coded view of a tree | |
| ### | |
| def rgb_to_hex(rgb): | |
| hex_color = '#' | |
| for x in range(3): | |
| hex_color += '{:<02X}'.format(rgb[x]) | |
| return hex_color | |
| def get_color_on_gradient(colors, progress): | |
| ''' | |
| :param colors: list of colors, where each color is a three-part iterable, (r, g, b), with each item being a value from 0 - 255 inclusive. Alternatively, each iterable may be a string formatted as '#000000'. | |
| :param progress: floating point number from 0.0 to 1.0, inclusive | |
| ''' | |
| def blend_channel(a, b, x): | |
| return int(math.sqrt((1 - x) * (a**2) + x * (b**2))) | |
| def hex_to_rgb(hex_num): | |
| return (int(hex_num[1:3], 16), int(hex_num[3:5], 16), int(hex_num[5:7], 16)) | |
| def blend_colors(c1, c2, t): | |
| if get_json_type(c1) == 'string': | |
| c1 = hex_to_rgb(c1) | |
| c2 = hex_to_rgb(c2) | |
| new_color = [0, 0, 0] | |
| for idx in range(3): | |
| new_color[idx] = blend_channel(c1[idx], c2[idx], t) | |
| return new_color | |
| if progress == 1.0: | |
| return hex_to_rgb(colors[-1]) | |
| n = 1 / (len(colors) - 1) | |
| tween_color_span = progress / n % 1 | |
| c_start_idx = min(math.floor(progress / n), len(colors) - 2) | |
| c_start_idx = math.floor(progress / n) | |
| # print('\n', progress, n, tween_color_span, c_start_idx, colors) | |
| c1 = colors[c_start_idx] | |
| c2 = colors[c_start_idx + 1] | |
| return blend_colors(c1, c2, tween_color_span) | |
| def make_count_tree(path, node): | |
| ''' | |
| path is Jpath.path | |
| ''' | |
| kv = path[0] | |
| if len(path) == 1: | |
| leaf_record = node.setdefault(kv.key_name, dict()) | |
| count = leaf_record.setdefault(kv.val_type, 0) | |
| leaf_record[kv.val_type] = count + 1 | |
| return leaf_record[kv.val_type] | |
| # leaf_record['is_term'] = True | |
| else: | |
| branch_record = node.setdefault(kv.key_name, dict()) | |
| return make_count_tree(path[1:], branch_record) | |
| class Jpath: | |
| def __init__(self): | |
| self.path = list() | |
| self.terminal_type = '' | |
| def __str__(self): | |
| return repr(self) | |
| def __repr__(self): | |
| return '[{}] "{}"'.format(', '.join([str(x) for x in self.path]), self.terminal_type) | |
| def __hash__(self): | |
| return hash(repr(self)) | |
| def __lt__(self, other): | |
| return repr(self) < repr(other) | |
| def __eq__(self, other): | |
| return repr(self) == repr(other) | |
| class KeyVal: | |
| def __init__(self): | |
| self.key_name = '' | |
| self.val_type = '' | |
| def __str__(self): | |
| return repr(self) | |
| def __repr__(self): | |
| return "({}, '{}')".format(repr(self.key_name), self.val_type) | |
| def desc_list_jpath(l): | |
| paths = set() | |
| for v in l: | |
| typ = get_json_type(v) | |
| kv = KeyVal() | |
| kv.key_name = 0 | |
| kv.val_type = typ | |
| if typ == 'dict': | |
| subpaths = desc_dict_jpath(v) | |
| elif typ == 'list': | |
| subpaths = desc_list_jpath(v) | |
| else: | |
| njp = Jpath() | |
| njp.terminal_type = typ | |
| subpaths = [njp] | |
| for sp in subpaths: | |
| njp = Jpath() | |
| njp.path = [kv] + sp.path | |
| njp.terminal_type = sp.terminal_type | |
| paths.add(njp) | |
| return sorted(list(paths)) | |
| def desc_dict_jpath(d): | |
| paths = list() | |
| for k, v in d.items(): | |
| typ = get_json_type(v) | |
| kv = KeyVal() | |
| kv.key_name = k | |
| kv.val_type = typ | |
| if typ == 'dict': | |
| subpaths = desc_dict_jpath(v) | |
| elif typ == 'list': | |
| subpaths = desc_list_jpath(v) | |
| else: | |
| njp = Jpath() | |
| njp.terminal_type = typ | |
| subpaths = [njp] | |
| for sp in subpaths: | |
| njp = Jpath() | |
| njp.path = [kv] + sp.path | |
| njp.terminal_type = sp.terminal_type | |
| paths.append(njp) | |
| return sorted(paths) | |
| def jtype_repr(typ): | |
| mp = {'dict': '{}', 'bool': 'false', 'number': '0', 'string': '""', 'list': '[]', 'null': 'null'} | |
| return mp[typ] | |
| def is_terminal_type(typ): | |
| return not (typ == 'dict' or typ == 'list') | |
| TERMINAL_TYPES = set(['bool', 'number', 'string', 'null']) | |
| def dict_represents_terminal_type(d): | |
| keys = set(d.keys()) | |
| return (keys - TERMINAL_TYPES) == set() | |
| def print_count_tree(node, most, colors, indent=0, results_file=None): | |
| if results_file is None: | |
| results_file = sys.stdout | |
| def get_types(v): | |
| return list(set(v.keys()) - set(['is_term'])) | |
| inds = ' ' * indent | |
| brace_indent = ' ' * indent | |
| key_indent = ' ' * (indent + 1) | |
| type_considering = 'dict' | |
| if 0 in node: | |
| # node = node[0] | |
| type_considering = 'list' | |
| # We need to know which key will be the last key so that when we're | |
| # printing it, we can omit the adding of the comma on the closing curly | |
| # brace. | |
| items = list(node.items()) | |
| if type_considering == 'dict': | |
| print('{') | |
| elif type_considering == 'list': | |
| print('[') | |
| for idx, (k, v) in enumerate(items): | |
| if type_considering == 'list': | |
| childvals = list(v.items()) | |
| for child_idx, (ck, cv) in enumerate(childvals): | |
| child_type = get_json_type(cv) | |
| if is_terminal_type(child_type): | |
| percent = 1 - (cv / most) | |
| cur_color = rgb_to_hex(get_color_on_gradient(colors, percent)) | |
| spn = '<span style="background-color: {}">'.format(cur_color) | |
| end_spn = '</span>' | |
| print('{}{}{}{}'.format(spn, key_indent, jtype_repr(ck), end_spn), file=results_file) | |
| else: | |
| if ck == 0: | |
| print(key_indent, end='', file=results_file) | |
| print_count_tree({0: cv}, most, colors, indent + 1, results_file) | |
| else: | |
| print(key_indent, end='', file=results_file) | |
| print_count_tree({ck: cv}, most, colors, indent + 1, results_file) | |
| if child_idx + 1 < len(childvals): | |
| print(',', end='', file=results_file) | |
| print(file=results_file) | |
| else: | |
| typ = get_json_type(v) | |
| if typ in TERMINAL_TYPES and k in TERMINAL_TYPES: | |
| continue | |
| elif dict_represents_terminal_type(v): | |
| typ = list(v.keys())[0] | |
| count = v[typ] | |
| percent = 1 - (count / most) | |
| cur_color = rgb_to_hex(get_color_on_gradient(colors, percent)) | |
| spn = '<span style="background-color: {}">'.format(cur_color) | |
| print('{}{}"{}": '.format(spn, key_indent, k), end='', file=results_file) | |
| print("{}</span>".format(jtype_repr(typ)), end='', file=results_file) | |
| else: | |
| print('{}"{}": '.format(key_indent, k), end='', file=results_file) | |
| print_count_tree(v, most, colors, indent + 1, results_file) | |
| if idx + 1 < len(items): | |
| print(',', file=results_file) | |
| else: | |
| print(file=results_file) | |
| if type_considering == 'dict': | |
| print('{}}}'.format(brace_indent), end='', file=results_file) | |
| elif type_considering == 'list': | |
| print('{}]'.format(brace_indent), end='', file=results_file) | |
| ### | |
| # Implementing the very basic "frequency path" raw text output | |
| ### | |
| def describe_list(l, parent_name=None): | |
| keys = set() | |
| for item in l: | |
| typ = get_json_type(item) | |
| if typ == 'list': | |
| subkeys = describe_list(item) | |
| for key in subkeys: | |
| keys.add(f'["0"]{key}') | |
| elif typ == 'dict': | |
| subkeys = describe_keys(item) | |
| for key in subkeys: | |
| keys.add(f'["0"].{key}') | |
| else: | |
| keys.add(f'["0"] ({get_json_type(item)})') | |
| return sorted(list(keys)) | |
| def describe_keys(d, parent_key=None): | |
| keys = list() | |
| for k, v in d.items(): | |
| if isinstance(v, dict): | |
| subkeys = describe_keys(v, k) | |
| for key in subkeys: | |
| keys.append(f'{k}.{key}') | |
| elif get_json_type(v) == 'list': | |
| subkeys = describe_list(v, k) | |
| for key in subkeys: | |
| keys.append(f'{k}.{key}') | |
| else: | |
| keys.append(f"{k} ({get_json_type(v)})") | |
| return keys | |
| def find_common_keys(docs): | |
| ''' | |
| Returns a dictionary of string->int, where keys will contain the union of | |
| all keys in all dictionaries in 'docs', and the value for each key will be | |
| the number of times a doc was found with that key in it. | |
| :params docs: a list of dictionaries | |
| ''' | |
| rv = dict() | |
| for doc in docs: | |
| for key in describe_keys(doc): | |
| cur = rv.setdefault(key, 0) | |
| rv[key] = cur + 1 | |
| return rv | |
| def main(): | |
| parser = argparse.ArgumentParser( | |
| "Shows stats info on JSON object schemas. " | |
| "Great for reverse engineering the structure of a poorly maintained Mongo database." | |
| ) | |
| parser.add_argument('input', help="Path to read from. Default is stdin ('-')", default='-', nargs='?') | |
| parser.add_argument( | |
| '--html-pretty-print', | |
| help="Prints results as a color-coded JSON schema-style object", | |
| default=argparse.SUPPRESS, | |
| action='store_true' | |
| ) | |
| args = parser.parse_args() | |
| filepath = args.input | |
| infile = sys.stdin | |
| if not filepath == '-': | |
| infile = open(filepath, 'r') | |
| # We don't handle streaming JSON, as that'd be way too difficult | |
| data = infile.read() | |
| docs = [x for x in decode_stacked(data)] | |
| # Handle the case where the series of json objects is correctly nested | |
| # within a list | |
| if len(docs) == 1 and get_json_type(docs) == 'list': | |
| docs = [x for x in docs[0]] | |
| if 'html_pretty_print' in args: | |
| jtrees = list() | |
| for doc in docs: | |
| jtrees += desc_dict_jpath(doc) | |
| most = 0 | |
| root = dict() | |
| for j in jtrees: | |
| mx = make_count_tree(j.path, root) | |
| most = max(most, mx) | |
| colors = ["#00429d", "#3aa794", "#dfdfc1", "#ff005e", "#93003a"] | |
| print('<!DOCTYPE html><html>') | |
| print('<body>') | |
| # Create a nice little table showing the order of the color gradient | |
| print('<table style="width: 800px; table-layout: fixed;"><tr>') | |
| for idx, _ in enumerate(colors): | |
| if idx == 0: | |
| print('<th style="text-align: left;">Used in 100% of Objects</th>') | |
| elif idx == len(colors) - 1: | |
| print('<th style="text-align: right;">Used in 0% of Objects</th>') | |
| else: | |
| print('<th></th>') | |
| print('</tr><tr>') | |
| for c in colors: | |
| print('<td style="background-color: {}; height: 50px;"</td>'.format(c)) | |
| print('</tr></table>') | |
| print('<pre style="background-color: grey; color: white;">') | |
| print_count_tree(root, most, colors) | |
| print('</pre><body></html>') | |
| else: | |
| key_frequency = find_common_keys(docs) | |
| vkf = [(k, count) for k, count in key_frequency.items()] | |
| for k, count in sorted(sorted(vkf, key=itemgetter(0)), key=itemgetter(1)): | |
| print(f"{count:<7} {k}") | |
| if __name__ == '__main__': main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment