Skip to content

Instantly share code, notes, and snippets.

@gdbassett
Last active September 21, 2017 20:55
Show Gist options
  • Save gdbassett/72ed43fa4168596442850e684e0194d7 to your computer and use it in GitHub Desktop.
Save gdbassett/72ed43fa4168596442850e684e0194d7 to your computer and use it in GitHub Desktop.
function to convert
import networkx as nx # NOTE: written against dev networkx 2.0
import logging
import inspect
import json
logger = logging.getLogger()
fileLogger = logging.FileHandler("~/Documents/Development/tmp/vega.log")
fileLogger.setLevel(logging.DEBUG)
logger.addHandler(fileLogger)
class objdict(dict):
def __getattr__(self, name):
try:
return self.recursive_getattr(self, name)
except:
raise AttributeError("No such attribute: " + name)
def __setattr__(self, name, value):
try:
self.recursive_setattr(self, name, value)
except:
raise AttributeError("No such attribute: " + name)
def __delattr__(self, name):
try:
self.recursive_delattr(self, name)
except:
raise AttributeError("No such attribute: " + name)
def recursive_setattr(self, o, name, value):
name = name.split(".", 1)
if len(name) > 1:
self.recursive_setattr(o[name[0]], name[1], value)
else:
o[name[0]] = value
def recursive_getattr(self, o, name):
name = name.split(".", 1)
if len(name) > 1:
return self.recursive_getattr(o[name[0]], name[1])
else:
return o[name[0]]
def recursive_delattr(self, o, name):
name = name.split(".", 1)
if len(name) > 1:
self.recursive_delattr(o[name[0]], name[1])
else:
del o[name[0]]
def keynames(schema, d, name, g, relationship = "normal"):
try:
if dict in inspect.getmro(type(d)) and "allOf" in d.keys():
# logging.info("Processing " + name + " as allOf")
print("Processing " + name + " as allOf")
for obj in d["allOf"]:
# logger.debug("Following " + name + " to one of allOf")
print("Following " + name + " to one of allOf")
_ = keynames(schema, obj, name, g, relationship="allOf")
if dict in inspect.getmro(type(d)) and "anyOf" in d.keys():
# logging.info("Processing " + name + " as anyOf")
print("Processing " + name + " as anyOf")
for obj in d["anyOf"]:
# logger.debug("Following " + name + " to one of anyOf")
print("Following " + name + " to one of anyOf")
_ = keynames(schema, obj, name, g, relationship="anyOf")
# logger.info("Processing " + name + " as anyOf")
print("Processing " + name + " as anyOf")
if dict in inspect.getmro(type(d)) and "oneOf" in d.keys():
# logging.info("Processing " + name + " as oneOf")
print("Processing " + name + " as oneOf")
for obj in d["oneOf"]:
# logger.debug("Following " + name + " to one of oneOf")
print("Following " + name + " to one of oneOf")
_ = keynames(schema, obj, name, g, relationship="oneOf")
# logger.info("Processing " + name + " as oneOf")
print("Processing " + name + " as oneOf")
if dict in inspect.getmro(type(d)) and "not" in d.keys():
# logging.info("Processing " + name + " as not")
print("Processing " + name + " as not")
for obj in d["not"]:
# logger.debug("Following " + name + " to one of not")
print("Following " + name + " to one of not")
_ = keynames(schema, obj, name, g, relationship="not")
# logger.info("Processing " + name + " as any")
print("Processing " + name + " as not")
if dict in inspect.getmro(type(d)) and "$ref" in d.keys():
# logger.info("Processing " + name + " as $ref")
print("Processing " + name + " as $ref")
ref = d["$ref"].split("/")
# Make sure path exists
# g.add_node(ref[0])
# g.add_edge("#", ref[0])
# for i in range(1, len(ref)):
# g.add_node(".".join(ref[:i]))
# g.add_edge(".".join(ref[:i-1]), ".".join(ref[:i]))
# Make sure name connects to path
g.add_edge(name, ".".join(ref), relationship=relationship)
# logger.debug("Following " + name + " to " + ".".join(ref))
print("Following " + name + " to " + ".".join(ref))
if len(list(g.successors(".".join(ref)))) == 0: # ref hasn't been parsed
_ = keynames(schema, getattr(schema, ".".join(ref[1:])), ".".join(ref), g)
# elif d.get("type", "") == "object": # some objects don't have a 'type' set
if dict in inspect.getmro(type(d)) and "properties" in d.keys():
# logger.info("Processing " + name + " as object")
print("Processing " + name + " as object")
try:
for k, v in d.get('properties', {}).iteritems():
g.add_node(name + "." + k)
g.add_edge(name, name + "." + k, relationship=relationship)
# logger.debug("Following " + name + " to " + name + "." + k)
print("Following " + name + " to " + name + "." + k)
_ = keynames(schema, v, name + "." + k, g)
except:
logging.error(d.get("properties", {}).keys())
raise
if dict in inspect.getmro(type(d)) and "additionalProperties" in d.keys():
if dict in inspect.getmro(type(d.get('additionalProperties', False))):
_ = keynames(schema, d['additionalProperties'], name, g)
if dict in inspect.getmro(type(d)) and "patternProperties" in d.keys():
# logger.info("Processing " + name + " as object")
print("Processing " + name + " as object")
try:
for k, v in d.get('patternProperties', {}).iteritems():
g.add_node(name + "." + k)
g.add_edge(name, name + "." + k, relationship=relationship)
# logger.debug("Following " + name + " to " + name + "." + k)
print("Following " + name + " to " + name + "." + k)
_ = keynames(schema, v, name + "." + k, g)
except:
logging.error(d.get("patternProperties", {}).keys())
raise
if dict in inspect.getmro(type(d)) and d.get("type", "") == "array":
# logger.info("Processing " + name + " as array")
print("Processing " + name + " as array")
name2 = name + ".items"
g.add_node(name2)
g.add_edge(name, name2, relationship=relationship)
# logger.debug("Following " + name + " to " + name2)
print("Following " + name + " to " + name2)
if "items" in d:
_ = keynames(schema, d['items'], name2, g)
# logger.info("Processing " + name + " for enumerations")
print("Processing " + name + " for enumerations")
if dict in inspect.getmro(type(d)):
for enum in d.get("enum", []): # catchall.
g.add_node(name + "." + enum)
g.add_edge(name, name + "." + enum, relationship=relationship)
# return
return g
except:
print(name)
#if len(d) < 10:
# pprint.pprint(d)
raise
# to use
# Load schema
vera_file = "~/Documents/Development/vega-lite/build/vega-lite-schema.json"
with open(vera_file, 'r') as filehandle:
schema = json.load(filehandle)
schemaObj = objdict(schema)
# convert to graph
g = nx.DiGraph()
g.add_node("#")
g = keynames(schemaObj, schemaObj, "#", g)
# Add some analysis
p=nx.shortest_path_length(g,source="#")
nx.set_node_attributes(g, p, "Distance from #")
# save
schema_graph_file = "~/Pictures/vega-lite-schema.gexf"
nx.write_gexf(g, schema_graph_file)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment