Skip to content

Instantly share code, notes, and snippets.

@joernhees
Last active July 31, 2020 02:35
Show Gist options
  • Save joernhees/5309883 to your computer and use it in GitHub Desktop.
Save joernhees/5309883 to your computer and use it in GitHub Desktop.
#!/usr/bin/python2.7
# -*- coding: utf-8 -*-
'''
Created on 21.03.2013
@author: joern
'''
import sys, os
import json, csv
from collections import defaultdict, Counter, OrderedDict
from operator import itemgetter,attrgetter
from functools import partial, wraps
import itertools, copy
import rdflib
from rdflib.term import Identifier, URIRef, BNode, Literal, Variable
from rdflib.namespace import Namespace, RDFS, SKOS
from rdflib import Graph, ConjunctiveGraph
from SPARQLWrapper import SPARQLWrapper, SPARQLWrapper2, JSON
import logging
log = logging.getLogger(__name__)
CHUNK_SIZE = 64
def grouper(n, iterable, fillvalue=None):
"grouper(3, 'ABCDEFG', 'x') --> ABC DEF Gxx"
args = [iter(iterable)] * n
return itertools.izip_longest(fillvalue=fillvalue, *args)
class SPARQLChain():
"""SPARQLChain is called using chaining and remembers associations.
A SPARQLChain object essentially stores the associations between sources
and targets in its associations dict.
Generating a new SPARQLChain object allows you to pass in some initial
roots (typically URIs). Follow up objects can be created by calling the
first object passing it a SPARQL pattern which essentially will connect
sources with targets. The sources of the new object are all targets of
the old object. The call returns a new object keeping the one it was called
on unmodified. This allows easy chaining of simple patterns and saving
intermittent results.
>>> DBP = Namespace('http://dbpedia.org/resource/')
>>> sc = SPARQLChain('http://dbpedia.org/sparql', DBP['Berlin'])
>>> types = sc('?s a ?o')
>>> berlin_labels = sc('?b rdfs:label ?l', Variable('b'), Variable('l'))
The associations between source (default: '?s') and target (default: '?o')
are stored as a dictionary of sets, so a single source can be associated
with multiple targets:
>>> sorted(types.items())[:2] # doctest: +NORMALIZE_WHITESPACE
[(rdflib.term.URIRef(u'http://dbpedia.org/resource/Berlin'),
rdflib.term.URIRef(u'http://dbpedia.org/class/yago/CapitalsInEurope')),
(rdflib.term.URIRef(u'http://dbpedia.org/resource/Berlin'),
rdflib.term.URIRef(u'http://dbpedia.org/class/yago/EuropeanCapitalsOfCulture'))]
The SPARQLChain objects support the usual dictionary methods which are
slightly modified to allow easy use. For example other than normal the
.values() method does not return a set of sets but just a flat merged
result set over all targets.
>>> DBP['Berlin'] in berlin_labels
True
>>> Literal('Berlin', lang='de') in berlin_labels.values()
True
>>> type_labels = types("?t rdfs:label ?l. FILTER(lang(?l)='en')",
... Variable('t'), Variable('l'))
>>> type_labels[URIRef('http://dbpedia.org/ontology/City')]
set([rdflib.term.Literal(u'city', lang=u'en')])
Any of the follow-up SPARQLChain chain objects can be used to iterate
over the whole chain (see iter_chain()) or iterate over the end to end
pairs of the whole chain (see iter_end2end()):
>>> sorted(type_labels.iter_chain())[:2] # doctest: +NORMALIZE_WHITESPACE
[[rdflib.term.URIRef(u'http://dbpedia.org/resource/Berlin'),
rdflib.term.URIRef(u'http://dbpedia.org/class/yago/CapitalsInEurope'),
rdflib.term.Literal(u'Capitals in Europe', lang=u'en')],
[rdflib.term.URIRef(u'http://dbpedia.org/resource/Berlin'),
rdflib.term.URIRef(u'http://dbpedia.org/class/yago/EuropeanCapitalsOfCulture'),
rdflib.term.Literal(u'European Capitals of Culture', lang=u'en')]]
>>> sorted(type_labels.iter_end2end())[:2] # doctest: +NORMALIZE_WHITESPACE
[(rdflib.term.URIRef(u'http://dbpedia.org/resource/Berlin'),
rdflib.term.Literal(u'Bundesland', lang=u'en')),
(rdflib.term.URIRef(u'http://dbpedia.org/resource/Berlin'),
rdflib.term.Literal(u'Capitals in Europe', lang=u'en'))]
SPARQLChain objects can be merged with + or += methods. Note that the
resulting object is the root of a new chain.
>>> labels = berlin_labels + type_labels
>>> labels.get_one(DBP['Berlin'])
rdflib.term.Literal(u'Berliini', lang=u'fi')
"""
class StartElem(object):
"""A void element designating the start of a chain.
Usually gets generated when SPARQLChain is called with some roots.
"""
pass
def __init__(self, endpoint, roots=[], chunk_size=None):
"""Instantiate a new SPARQLChain chain (root element).
Args
endpoint: the SPARQL endpoint URI
roots: if given the inputs for the first chaining operation.
chunk_size: let's you specify how many URIs to query in one request
(default=64)
"""
self.endpoint = endpoint
self.chunk_size = chunk_size or CHUNK_SIZE
self.associations = defaultdict(set) # is meant to hold a source->set(target1,target2...)
self.parent = None
self.is_root = False # this is a root node with given roots
if roots:
if isinstance(roots, URIRef):
roots = [roots]
elif isinstance(roots, (list,tuple)):
assert all(isinstance(u, URIRef) for u in roots)
else:
TypeError("roots can be a list or tuple of URIRefs or a URIRef")
self.associations[SPARQLChain.StartElem] = set(roots)
self.is_root = True
def __call__(self,
pattern,
source=Variable('s'),
target=Variable('o'),
endpoint=None,
chunk_size=None):
"""Will construct a source-to-target sparql query and remember the associations.
Args
pattern: A sparql select's where clause.
source: Variable name for source objects ('?s' by default)
target: Variable name for target objects ('?o' by default)
endpoint: A new SPARQL endpoint URI if desired.
chunk_size: All chained queries will use this chunk_size.
Returns
A new SPARQLChain object which can be iterated over or used to
chain further calls.
"""
assert isinstance(pattern, (basestring))
assert isinstance(source, (Variable))
assert isinstance(target, (Variable))
endpoint = endpoint or self.endpoint
chunk_size = chunk_size or self.chunk_size
if not self.values():
log.info('called chaining on empty chain')
return
res = SPARQLChain(endpoint, chunk_size=chunk_size)
query_template = u'''
select distinct %(source)s %(target)s where {
{
%(pattern)s
}
%(values)s
}'''
for values in grouper(self.chunk_size, self.values()):
# workaround for VALUES bug https://github.com/openlink/virtuoso-opensource/issues/28
values_clause = u'FILTER(%s)' % (
u'\n || '.join(source.n3() + u'=' + s.n3() for s in values if s))
bindings = {
'source': source.n3(),
'target': target.n3(),
'pattern': pattern,
'values': values_clause,
}
query=query_template % bindings
log.debug(query)
sparql = SPARQLWrapper(endpoint)
sparql.setReturnFormat(JSON)
sparql.setQuery(query)
results_conv = sparql.queryAndConvert()
results_rdflib = sparqlJSONresultBindingsToRDFlib(results_conv['results']['bindings'])
for binding in results_rdflib:
res.associations[binding[source]].add(binding[target])
res.parent = self
return res
def __iter__(self):
return iter(self.associations)
def get(self, key, default=None):
"""Returns the values associated with key or default."""
return self.associations.get(key, default)
def get_one(self, key, default=None):
"""Returns a the alpha-num first element from the values for key or default."""
return sorted(self.get(key, [default]))[0]
def __getitem__(self, key):
"""Get the associated targets for a given source.
Returns
A list of targets.
"""
return self.associations[key]
def __setitem__(self, key):
raise NotImplementedError('not sure this is a good idea')
def __delitem__(self, key):
raise NotImplementedError('not sure this is a good idea')
def __contains__(self, key):
"""Check if we have an associated value for key."""
return key in self.associations
def keys(self):
"""Returns all association keys (the sources)."""
return self.associations.keys()
def values(self):
"""Returns a uniquified list of all returning values (the targets)."""
candy = self.associations.values()
return set(itertools.chain(*candy))
def items(self):
"""Iterate over all retrieved targets for all sources.
Returns
Pairs of (source,target), similar to a multidict.
"""
return [e for e in self.iteritems()]
def iteritems(self):
"""Iterate over all retrieved targets for all sources.
Returns
Pairs of (source,target), similar to a multidict.
"""
for source, targets in self.associations.iteritems():
for target in targets:
yield (source,target)
def iter_chain(self):
"""Iterates over the whole chain of associations.
For example let this be the association tree of
a = SPARQLChain(ep, [s1,s2])
b=a(pattern1)
c=b(pattern2)
b c
s1-->s1.1-->t1.1.1
\->t1.1.2
s2-->s2.1-->t2.1.1
\->s2.2-->t2.2.1
c.iter_chain will now iterate over all combinations of the tree:
[s1,s1.1,t1.1.1],
[s1,s1.1,t1.1.2],
[s2,s2.1,t2.1.1],
[s2,s2.2,t2.2.1]
Returns
A generator over lists of association chain elements.
"""
p = self.parent
if not p:
# we're the start of the chain!
if self.is_root:
for t in self.values():
yield [t]
else:
# could for example happen by adding two results together
for s,t in self.iteritems():
yield [s,t]
else:
for ptree in p.iter_chain():
for target in self[ptree[-1]]:
yield ptree + [target]
def iter_end2end(self):
"""Called on the chain end iterates over all pairs of (root_s,final_t).
For example let this be the association tree of
a = SPARQLChain(ep, [s1,s2])
b=a(pattern1)
c=b(pattern2)
b c
s1-->s1.1-->t1.1.1
\->t1.1.2
s2-->s2.1-->t2.1.1
\->s2.2-->t2.2.1
c.iter_end2end will now iterate over all pairs of the root sources
(left most) and final targets (right most) of the tree:
[s1,t1.1.1],
[s1,t1.1.2],
[s2,t2.1.1],
[s2,t2.2.1]
Returns
A generator over (root_source,final_target) pairs.
"""
for assoc in self.iter_chain():
root_s,final_t = assoc[0], assoc[-1] # leave out intermittent assocs
yield (root_s,final_t)
def __add__(self, other):
"""Merges two SPARQLChain objects returning a new one.
Notice that the parent of the returned new object is None.
The endpoint and chunk_size of the left object are used.
"""
assert isinstance(other, SPARQLChain)
res = SPARQLChain(self.endpoint, chunk_size=self.chunk_size)
res.associations = copy.deepcopy(self.associations)
# add all from other to res.associations
for s,t in other.iteritems():
res.associations[s].add(t)
return res
def __iadd__(self, other):
"""Update this SPARQLChain with all associations from another.
Warning: parent will be set to None."""
assert isinstance(other, SPARQLChain)
if self.is_root:
log.warning("adding to root node, do you know what you're doing?")
# add all from other to self.associations
for s,t in other.iteritems():
self.associations[s].add(t)
self.parent = None
return self
def __str__(self):
return '['+',\n '.join(str(l) for l in sorted(self.items()))+']'
def sparqlJSONresultBindingsToRDFlib(resBindings):
""" Converts a result's bindings as retrieved in res["results"]["bindings"]
by SPARQLWrapper with a sparql select query into the corresponding
list with rdflib terms, e.g., Literal, URIref, BNode.
BNodes won't be mixed between iterated calls of this function even if
they happen to have the same "value". Internally the given value is mapped
to a random value, which is remembered in _one and the same_ call of this
function only."""
_bnodes = {} # makes sure we don't confuse BNodes from different results
def dictToRDFlib(d):
""" Maps a dict following the syntax in http://www.w3.org/TR/rdf-sparql-json-res/
to the corresponding rdflib term. """
if d == None: return None
t = d["type"]
v = d["value"]
if t == "uri":
return URIRef(v)
if t == "bnode":
if v not in _bnodes:
_bnodes[v] = BNode() # v is not used as BNode value on purpose (multiple calls should not have the same value)
return _bnodes[v]
l = d.get("xml:lang", None)
if t == "literal":
return Literal(v, lang=l)
if t == "typed-literal":
return Literal(v, lang=l, datatype=d["datatype"]) # will raise type error if lang and datatype set
raise rdflib.exceptions.ParserError(
"Invalid sparql json result according to http://www.w3.org/TR/rdf-sparql-json-res/: {0}".format(d))
resBindingsRDFlib = []
for row in resBindings:
tmp = {}
for k,v in row.items():
tmp[k] = dictToRDFlib(v)
resBindingsRDFlib.append(tmp)
return resBindingsRDFlib
def main():
logging.basicConfig(level=logging.INFO)
from rdflib.namespace import Namespace, RDFS, SKOS
from pprint import pprint
DBPEDIA = Namespace('http://dbpedia.org/resource/')
sc = SPARQLChain('http://dbpedia.org/sparql',
DBPEDIA['Category:Semantic_Web'])
q = '?uri rdfs:label ?label. %(labelLangFilter)s'
q_en = q % {'labelLangFilter': "FILTER(lang(?label)='en')"}
en_labels = sc(q_en, Variable('uri'), Variable('label'))
#print en_labels
#print en_labels.values()
subq = '?subcat skos:broader ?cat.'
sub_cats = sc(subq, Variable('cat'), Variable('subcat'))
sc_labels = sub_cats(q_en, Variable('uri'), Variable('label'))
#print sc_labels
#pprint(list(sc_labels.iter_chain()))
import doctest
doctest.testmod()#verbose=True)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment