Last active
September 21, 2017 20:55
-
-
Save gdbassett/72ed43fa4168596442850e684e0194d7 to your computer and use it in GitHub Desktop.
function to convert
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import networkx as nx # NOTE: written against dev networkx 2.0 | |
import logging | |
import inspect | |
import json | |
logger = logging.getLogger() | |
fileLogger = logging.FileHandler("~/Documents/Development/tmp/vega.log") | |
fileLogger.setLevel(logging.DEBUG) | |
logger.addHandler(fileLogger) | |
class objdict(dict): | |
def __getattr__(self, name): | |
try: | |
return self.recursive_getattr(self, name) | |
except: | |
raise AttributeError("No such attribute: " + name) | |
def __setattr__(self, name, value): | |
try: | |
self.recursive_setattr(self, name, value) | |
except: | |
raise AttributeError("No such attribute: " + name) | |
def __delattr__(self, name): | |
try: | |
self.recursive_delattr(self, name) | |
except: | |
raise AttributeError("No such attribute: " + name) | |
def recursive_setattr(self, o, name, value): | |
name = name.split(".", 1) | |
if len(name) > 1: | |
self.recursive_setattr(o[name[0]], name[1], value) | |
else: | |
o[name[0]] = value | |
def recursive_getattr(self, o, name): | |
name = name.split(".", 1) | |
if len(name) > 1: | |
return self.recursive_getattr(o[name[0]], name[1]) | |
else: | |
return o[name[0]] | |
def recursive_delattr(self, o, name): | |
name = name.split(".", 1) | |
if len(name) > 1: | |
self.recursive_delattr(o[name[0]], name[1]) | |
else: | |
del o[name[0]] | |
def keynames(schema, d, name, g, relationship = "normal"): | |
try: | |
if dict in inspect.getmro(type(d)) and "allOf" in d.keys(): | |
# logging.info("Processing " + name + " as allOf") | |
print("Processing " + name + " as allOf") | |
for obj in d["allOf"]: | |
# logger.debug("Following " + name + " to one of allOf") | |
print("Following " + name + " to one of allOf") | |
_ = keynames(schema, obj, name, g, relationship="allOf") | |
if dict in inspect.getmro(type(d)) and "anyOf" in d.keys(): | |
# logging.info("Processing " + name + " as anyOf") | |
print("Processing " + name + " as anyOf") | |
for obj in d["anyOf"]: | |
# logger.debug("Following " + name + " to one of anyOf") | |
print("Following " + name + " to one of anyOf") | |
_ = keynames(schema, obj, name, g, relationship="anyOf") | |
# logger.info("Processing " + name + " as anyOf") | |
print("Processing " + name + " as anyOf") | |
if dict in inspect.getmro(type(d)) and "oneOf" in d.keys(): | |
# logging.info("Processing " + name + " as oneOf") | |
print("Processing " + name + " as oneOf") | |
for obj in d["oneOf"]: | |
# logger.debug("Following " + name + " to one of oneOf") | |
print("Following " + name + " to one of oneOf") | |
_ = keynames(schema, obj, name, g, relationship="oneOf") | |
# logger.info("Processing " + name + " as oneOf") | |
print("Processing " + name + " as oneOf") | |
if dict in inspect.getmro(type(d)) and "not" in d.keys(): | |
# logging.info("Processing " + name + " as not") | |
print("Processing " + name + " as not") | |
for obj in d["not"]: | |
# logger.debug("Following " + name + " to one of not") | |
print("Following " + name + " to one of not") | |
_ = keynames(schema, obj, name, g, relationship="not") | |
# logger.info("Processing " + name + " as any") | |
print("Processing " + name + " as not") | |
if dict in inspect.getmro(type(d)) and "$ref" in d.keys(): | |
# logger.info("Processing " + name + " as $ref") | |
print("Processing " + name + " as $ref") | |
ref = d["$ref"].split("/") | |
# Make sure path exists | |
# g.add_node(ref[0]) | |
# g.add_edge("#", ref[0]) | |
# for i in range(1, len(ref)): | |
# g.add_node(".".join(ref[:i])) | |
# g.add_edge(".".join(ref[:i-1]), ".".join(ref[:i])) | |
# Make sure name connects to path | |
g.add_edge(name, ".".join(ref), relationship=relationship) | |
# logger.debug("Following " + name + " to " + ".".join(ref)) | |
print("Following " + name + " to " + ".".join(ref)) | |
if len(list(g.successors(".".join(ref)))) == 0: # ref hasn't been parsed | |
_ = keynames(schema, getattr(schema, ".".join(ref[1:])), ".".join(ref), g) | |
# elif d.get("type", "") == "object": # some objects don't have a 'type' set | |
if dict in inspect.getmro(type(d)) and "properties" in d.keys(): | |
# logger.info("Processing " + name + " as object") | |
print("Processing " + name + " as object") | |
try: | |
for k, v in d.get('properties', {}).iteritems(): | |
g.add_node(name + "." + k) | |
g.add_edge(name, name + "." + k, relationship=relationship) | |
# logger.debug("Following " + name + " to " + name + "." + k) | |
print("Following " + name + " to " + name + "." + k) | |
_ = keynames(schema, v, name + "." + k, g) | |
except: | |
logging.error(d.get("properties", {}).keys()) | |
raise | |
if dict in inspect.getmro(type(d)) and "additionalProperties" in d.keys(): | |
if dict in inspect.getmro(type(d.get('additionalProperties', False))): | |
_ = keynames(schema, d['additionalProperties'], name, g) | |
if dict in inspect.getmro(type(d)) and "patternProperties" in d.keys(): | |
# logger.info("Processing " + name + " as object") | |
print("Processing " + name + " as object") | |
try: | |
for k, v in d.get('patternProperties', {}).iteritems(): | |
g.add_node(name + "." + k) | |
g.add_edge(name, name + "." + k, relationship=relationship) | |
# logger.debug("Following " + name + " to " + name + "." + k) | |
print("Following " + name + " to " + name + "." + k) | |
_ = keynames(schema, v, name + "." + k, g) | |
except: | |
logging.error(d.get("patternProperties", {}).keys()) | |
raise | |
if dict in inspect.getmro(type(d)) and d.get("type", "") == "array": | |
# logger.info("Processing " + name + " as array") | |
print("Processing " + name + " as array") | |
name2 = name + ".items" | |
g.add_node(name2) | |
g.add_edge(name, name2, relationship=relationship) | |
# logger.debug("Following " + name + " to " + name2) | |
print("Following " + name + " to " + name2) | |
if "items" in d: | |
_ = keynames(schema, d['items'], name2, g) | |
# logger.info("Processing " + name + " for enumerations") | |
print("Processing " + name + " for enumerations") | |
if dict in inspect.getmro(type(d)): | |
for enum in d.get("enum", []): # catchall. | |
g.add_node(name + "." + enum) | |
g.add_edge(name, name + "." + enum, relationship=relationship) | |
# return | |
return g | |
except: | |
print(name) | |
#if len(d) < 10: | |
# pprint.pprint(d) | |
raise | |
# to use | |
# Load schema | |
vera_file = "~/Documents/Development/vega-lite/build/vega-lite-schema.json" | |
with open(vera_file, 'r') as filehandle: | |
schema = json.load(filehandle) | |
schemaObj = objdict(schema) | |
# convert to graph | |
g = nx.DiGraph() | |
g.add_node("#") | |
g = keynames(schemaObj, schemaObj, "#", g) | |
# Add some analysis | |
p=nx.shortest_path_length(g,source="#") | |
nx.set_node_attributes(g, p, "Distance from #") | |
# save | |
schema_graph_file = "~/Pictures/vega-lite-schema.gexf" | |
nx.write_gexf(g, schema_graph_file) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment