Last active
June 14, 2023 14:11
-
-
Save pebbie/5704765 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os, sys | |
from lxml import etree | |
from rdflib import ConjunctiveGraph, Namespace, exceptions | |
from rdflib import URIRef, RDFS, RDF, OWL, BNode, Literal | |
def get_tag_no_ns(tname): | |
if "}" in tname: | |
return tname[tname.index("}")+1:] | |
else: | |
return tname | |
class UnknownPrefixException(Exception): | |
pass | |
def parse_dom(filename): | |
""" | |
parse xml document from given filename and returns rdflib.ConjunctiveGraph instance containing OWL ontology statements | |
""" | |
def reg_name(node): | |
""" | |
recursively traverse document tree for named tags and store in a global map for later referencing | |
""" | |
if "name" in node.attrib: | |
tagmap[node.attrib["name"]] = node | |
for child in node: | |
reg_name(child) | |
def resolve_type_instr(etype): | |
if ":" in etype: | |
#resolve prefixed to full qualified name | |
prefix = etype[:etype.index(":")] | |
if prefix in root.nsmap: | |
#lookup from declared namespace | |
full_prefix = root.nsmap[prefix] | |
if "targetNamespace" in root.attrib and full_prefix == root.attrib["targetNamespace"]: | |
etype = etype[etype.index(":")+1:] | |
else: | |
if full_prefix[-1] != "/": full_prefix += "#" | |
etype = full_prefix+etype[etype.index(":")+1:] | |
return etype | |
def process_choice(ename, childs): | |
anon_cls = ename+"_anon_class" | |
anon_list = anon_cls+"_list" | |
g.add(( NS[ename], OWL.subClassOf, BNode(anon_cls))) | |
g.add(( BNode(anon_cls), RDF.type, OWL.Class)) | |
g.add(( BNode(anon_cls), OWL.unionOf, BNode(anon_list) )) | |
#generate anonymous restricted classes (ARC) | |
ch2arc = {} #mapping from child property name to ARC | |
for ch in childs: | |
anon_list_r = "_%s_r" % (ch["name"]) | |
item = NS[anon_list_r] | |
is_first = True | |
g.add(( item, RDF.type, OWL.Class )) | |
prop_node = NS[property_prefix+ch["name"]] | |
g.add(( item, OWL.intersectionOf, BNode(anon_list_r) )) | |
if "minCardinality" in ch: | |
restriction_node = BNode("%s_on_%s" % (anon_list_r, "minCardinality")) | |
g.add(( restriction_node, RDF.type, OWL.Restriction )) | |
g.add(( restriction_node, OWL.onProperty, prop_node )) | |
g.add(( restriction_node, OWL.minCardinality, Literal(int(ch["minCardinality"])) )) | |
g.add(( BNode(anon_list_r), RDF.first, restriction_node)) | |
is_first = False | |
if "maxCardinality" in ch: | |
if not is_first: | |
#close previous list | |
next_node = "%s_next_%s" % (anon_list_r, "maxCardinality") | |
g.add(( BNode(anon_list_r), RDF.rest, BNode(next_node) )) | |
anon_list_r = next_node | |
is_first = False | |
restriction_node = BNode("%s_on_%s" % (anon_list_r, "maxCardinality")) | |
g.add(( restriction_node, RDF.type, OWL.Restriction )) | |
g.add(( restriction_node, OWL.onProperty, prop_node )) | |
g.add(( restriction_node, OWL.maxCardinality, Literal(int(ch["maxCardinality"])) )) | |
g.add(( BNode(anon_list_r), RDF.first, restriction_node)) | |
if "range" in ch: | |
if not is_first: | |
#close previous list | |
next_node = "%s_next_%s" % (anon_list_r, "range") | |
g.add(( BNode(anon_list_r), RDF.rest, BNode(next_node) )) | |
anon_list_r = next_node | |
restriction_node = BNode("%s_on_%s" % (anon_list_r, "range")) | |
g.add(( restriction_node, RDF.type, OWL.Restriction )) | |
g.add(( restriction_node, OWL.onProperty, prop_node )) | |
if ":" in ch["range"]: | |
prop_obj = URIRef(ch["range"]) | |
else: | |
prop_obj = NS[ch["range"]] | |
g.add(( restriction_node, OWL.allValuesFrom, prop_obj )) | |
g.add(( BNode(anon_list_r), RDF.first, restriction_node)) | |
g.add(( BNode(anon_list_r), RDF.rest, RDF.nil)) | |
ch2arc[ch["name"]] = item | |
#generate set differenced classes of ARCs (SDARC) | |
sdarcs = [] | |
for ch in childs: | |
complement_classes = [c["name"] for c in childs if c["name"] != ch["name"]] | |
sdarc = "_%s_without_%s" % (ch["name"], "".join(complement_classes)) | |
g.add(( NS[sdarc], RDF.type, OWL.Class )) | |
intersection_node = "%s_setdiff_" % (ch["name"]) | |
g.add(( NS[sdarc], OWL.intersectionOf, BNode(intersection_node) )) | |
g.add(( BNode(intersection_node), RDF.first, NS["_%s_r" % (ch["name"])] )) | |
next_intersection = "%s_setdiff_next" % (ch["name"]) | |
g.add(( BNode(intersection_node), RDF.rest, BNode(next_intersection) )) | |
intersection_node = next_intersection | |
next_intersection = "%s_setdiff_next_item" % (ch["name"]) | |
g.add(( BNode(intersection_node), RDF.first, BNode(next_intersection) )) | |
g.add(( BNode(intersection_node), RDF.rest, RDF.nil )) | |
union_of_complements = "%s_uoc" % (sdarc) | |
comp_list = "%s_complist" % (sdarc) | |
g.add(( BNode(next_intersection), OWL.complementOf, BNode(union_of_complements) )) | |
g.add(( BNode(union_of_complements), OWL.unionOf, BNode(comp_list) )) | |
for cci, cc in enumerate(complement_classes): | |
g.add(( BNode(comp_list), RDF.first, NS["_%s_r" % (cc)])) | |
if cci == len(complement_classes)-1: | |
g.add(( BNode(comp_list), RDF.rest, RDF.nil)) | |
else: | |
next_node = "%s_complist_%s" % (sdarc, cc) | |
g.add(( BNode(comp_list), RDF.rest, BNode(next_node) )) | |
comp_list = next_node | |
sdarcs.append(sdarc) | |
#generate unions of SDARCs | |
for si, sdarc in enumerate(sdarcs): | |
g.add(( BNode(anon_list), RDF.first, NS[sdarc] )) | |
if si == len(sdarcs)-1: | |
g.add(( BNode(anon_list), RDF.rest, RDF.nil)) | |
else: | |
next_node = "%s_complist_%s" % (anon_cls, sdarc) | |
g.add(( BNode(anon_list), RDF.rest, BNode(next_node) )) | |
anon_list = next_node | |
def process_seq_all(ename, childs): | |
anon_cls = ename+"_anon_class" | |
anon_list = anon_cls+"_list" | |
g.add(( NS[ename], OWL.subClassOf, BNode(anon_cls))) | |
g.add(( BNode(anon_cls), RDF.type, OWL.Class)) | |
g.add(( BNode(anon_cls), OWL.intersectionOf, BNode(anon_list) )) | |
restrictions = [] | |
for ci,ch in enumerate(childs): | |
if "minCardinality" in ch: | |
rst_item = {} | |
rst_item["property"] = NS[property_prefix+ch["name"]] | |
rst_item["predicate"] = OWL.minCardinality | |
rst_item["object"] = Literal(int(ch["minCardinality"])) | |
restrictions.append(rst_item) | |
if "maxCardinality" in ch: | |
rst_item = {} | |
rst_item["property"] = NS[property_prefix+ch["name"]] | |
rst_item["predicate"] = OWL.maxCardinality | |
rst_item["object"] = Literal(int(ch["maxCardinality"])) | |
restrictions.append(rst_item) | |
if "range" in ch: | |
rst_item = {} | |
rst_item["property"] = NS[property_prefix+ch["name"]] | |
rst_item["predicate"] = OWL.allValuesFrom | |
if ":" in ch["range"]: | |
rst_item["object"] = URIRef(ch["range"]) | |
else: | |
rst_item["object"] = NS[ch["range"]] | |
restrictions.append(rst_item) | |
for ci, ch in enumerate(restrictions): | |
#construct list of intersectionOf items | |
restriction_node = BNode(ch["property"]+"_anon_"+str(ci)) | |
g.add(( restriction_node, RDF.type, OWL.Restriction )) | |
g.add(( restriction_node, OWL.onProperty, ch["property"] )) | |
g.add(( restriction_node, ch["predicate"], ch["object"] )) | |
g.add(( BNode(anon_list), RDF.first, restriction_node)) | |
if ci == len(childs)-1: | |
g.add(( BNode(anon_list), RDF.rest, RDF.nil)) | |
else: | |
next_node = anon_cls+"_%d" % ci | |
g.add(( BNode(anon_list), RDF.rest, BNode(next_node) )) | |
anon_list = next_node | |
def convert_namedtype(node): | |
""" | |
handle types (simpleType and complexType) elements | |
""" | |
is_dt = False | |
tname = node.attrib["name"] | |
if tname not in types: | |
if node.tag == xs_st: | |
print tname, "is DatatypeProperty" | |
g.add(( NS[tname], RDF.type, OWL.DatatypeProperty )) | |
is_dt = True | |
#check for restriction | |
elif node.tag == xs_ct: | |
print tname, "is Class" | |
g.add(( NS[tname], RDF.type, OWL.Class )) | |
for cc in node: | |
tag = get_tag_no_ns(cc.tag) | |
if tag == "attribute": | |
aname = cc.attrib["name"] | |
if "type" in cc.attrib: | |
atype = resolve_type_instr(cc.attrib["type"]) | |
g.add(( NS[property_prefix+aname], RDFS.range, URIRef(atype) )) | |
else: | |
pass | |
print aname, "is DatatypeProperty from attr" | |
g.add(( NS[property_prefix+aname], RDF.type, OWL.DatatypeProperty )) | |
g.add(( NS[property_prefix+aname], RDFS.domain, NS[tname] )) | |
elif tag in ["sequence", "choice", "all"]: | |
childs = [] | |
for ce in cc: | |
#child element of named ct | |
if ce.tag == xs_el: | |
has_child = True | |
cename, info = convert_element(ce) | |
childs.append(info) | |
if not info["isClass"]: | |
g.add(( NS[property_prefix + cename], RDFS.domain, NS[tname] )) | |
if len(childs)>0: | |
if tag in ["sequence", "all"]: | |
process_seq_all(tname, childs) | |
elif tag == "choice": | |
process_choice(tname, childs) | |
types.append(tname) | |
return tname, is_dt | |
def convert_element(node, is_global=False): | |
""" | |
handle element conversion | |
""" | |
#print node | |
info = {} | |
has_cac = False #has child, attribute, or content | |
if "name" in node.attrib: | |
ename = node.attrib["name"] | |
info["name"] = ename | |
if "minOccurs" in node.attrib: | |
info["minCardinality"] = node.attrib["minOccurs"] | |
if "maxOccurs" in node.attrib: | |
info["maxCardinality"] = node.attrib["maxOccurs"] | |
if "type" in node.attrib: | |
etype = resolve_type_instr(node.attrib["type"]) | |
if XSD in etype: | |
print ename, "is DatatypeProperty" | |
g.add(( NS[property_prefix+ename], RDF.type, OWL.DatatypeProperty )) | |
info["range"] = etype | |
elif etype in tagmap: | |
typ, is_dt = convert_namedtype(tagmap[etype]) | |
if is_dt: | |
pass | |
else: | |
if is_global: | |
print ename, "sameAs", typ | |
g.add(( NS[ename], OWL.sameAs, NS[typ] )) | |
else: | |
print ename, "is ObjectProperty. \n\trange:", node.attrib["type"] | |
g.add(( NS[property_prefix+ename], RDF.type, OWL.ObjectProperty )) | |
info["range"] = typ | |
else: #unknown type name | |
print ":", etype | |
else: | |
for child in node: | |
#anonymous type | |
if child.tag == xs_st: | |
#element declared as simpleType converted into owl:DatatypeProperty | |
print ename, " is DatatypeProperty" | |
g.add(( NS[property_prefix+ename], RDF.type, OWL.DatatypeProperty )) | |
elif child.tag == xs_ct: | |
#element declared as complexType converted into owl:Class | |
for cc in child: | |
tag = get_tag_no_ns(cc.tag) | |
if tag == "attribute": | |
has_cac = True | |
aname = cc.attrib["name"] | |
if "type" in cc.attrib: | |
atype = resolve_type_instr(cc.attrib["type"]) | |
g.add(( NS[property_prefix+aname], RDFS.range, URIRef(atype) )) | |
else: | |
pass | |
print aname, "is DatatypeProperty from attr" | |
g.add(( NS[property_prefix+aname], RDF.type, OWL.DatatypeProperty )) | |
g.add(( NS[property_prefix+aname], RDFS.domain, NS[ename] )) | |
elif "Content" in tag: | |
#TODO | |
#simpleContent with attribute extension converted to Class with generated content property | |
#complexContent is a class with generated properties | |
has_cac = True | |
elif tag in ["sequence", "choice", "all"]: | |
#subelement specifier converted using set operations on equivalent classes | |
childs = [] | |
for ce in cc: | |
#child element of anon ct | |
if ce.tag == xs_el: | |
has_cac = True | |
cename, chinfo = convert_element(ce) | |
childs.append(chinfo) | |
if chinfo["isClass"]: | |
#declare object property | |
g.add(( NS[property_prefix+cename], RDF.type, OWL.ObjectProperty )) | |
else: | |
g.add(( NS[property_prefix+cename], RDFS.domain, NS[ename] )) | |
if len(childs)>0: | |
if tag in ["sequence", "all"]: | |
process_seq_all(ename, childs) | |
elif tag == "choice": | |
process_choice(ename, childs) | |
if has_cac: | |
g.add(( NS[ename], RDF.type, OWL.Class )) | |
print ename, "is Class" | |
info["isClass"] = has_cac | |
return ename, info | |
with open(filename, "r") as f: root = etree.parse(f).getroot() | |
tagmap = {} | |
types = [] | |
reg_name(root) | |
#predeclaration | |
XSD = "http://www.w3.org/2001/XMLSchema" | |
xs_st = "{%s}%s" % (XSD, "simpleType") | |
xs_ct = "{%s}%s" % (XSD, "complexType") | |
xs_el = "{%s}%s" % (XSD, "element") | |
xs_at = "{%s}%s" % (XSD, "attribute") | |
#graph | |
g = ConjunctiveGraph() | |
for k, v in root.nsmap.items(): | |
if k is not None: | |
g.bind(k, v) | |
if "targetNamespace" in root.attrib: | |
NS = Namespace(root.attrib["targetNamespace"]+"#") | |
else: | |
NS = Namespace("http://example.org/xsdowl#") | |
g.bind(None, NS) | |
g.bind("owl", OWL) | |
g.bind("xsd", XSD+"#") | |
property_prefix = "has_" | |
#do traversal directed translation | |
#print root.findall(xs_el) | |
for el in root.findall(xs_el): | |
convert_element(el, True) | |
return g | |
if __name__ == "__main__": | |
if len(sys.argv)>1: | |
graph = parse_dom(sys.argv[1]) | |
graph.serialize(sys.argv[1][:-4]+'.ttl', format='turtle') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment