Last active
August 11, 2017 17:39
-
-
Save lawlesst/5192700 to your computer and use it in GitHub Desktop.
A Python script to parse VIVO list view config XML files and issue SPARQL queries contained within them.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Utility script to parse VIVO listViewConfig.xml files. | |
Requires | |
- rdflib_sparql | |
- SPARQLWrapper | |
Run as | |
$ python generate_listview.py /path/to/listViewConfig-awardOrHonor.xml | |
""" | |
import os | |
from pprint import pprint | |
import re | |
import sys | |
import xml.etree.ElementTree as ET | |
import rdflib | |
from rdflib import Graph | |
from rdflib_sparql.processor import prepareQuery | |
from SPARQLWrapper import SPARQLWrapper, JSON, N3 | |
#logging | |
#http://dancingpenguinsoflight.com/2009/03/simple-and-effective-python-logging/ | |
import logging | |
logger = logging.getLogger(sys.argv[0]) | |
logger.setLevel(logging.DEBUG) | |
formatter = logging.Formatter("%(asctime)s - %(name)s - %(lineno)s - \ | |
%(levelname)s - %(message)s") | |
conlog = logging.StreamHandler() | |
conlog.setFormatter(formatter) | |
logger.addHandler(conlog) | |
#Setup VIVO's SPARQL endpoint | |
from utility_scripts.vivo_sparql import VIVOSparql | |
sparql = VIVOSparql(os.getenv('VIVO_URL')) | |
sparql.setCredentials(os.getenv('VIVO_USER'), os.getenv('VIVO_PASS')) | |
sparql.login() | |
def pre_process_listview(listview_file): | |
with open(sys.argv[1]) as raw_file: | |
raw = raw_file.read() | |
list_view_xml = raw.replace('<collated>', '')\ | |
.replace('</collated>', '')\ | |
.replace('<critical-data-required>', '')\ | |
.replace('</critical-data-required>', '') | |
return list_view_xml | |
def process_listview_select(tree): | |
select = tree.findall('query-select')[0].text | |
#Pull out the fields from the select queries and use | |
#to put results in a list of dictionaries that's easier to read. | |
get_selects = re.compile("SELECT DISTINCT\s+(\?.*) WHERE \{", re.DOTALL) | |
field_list = [\ | |
r.strip().replace('\n', '') \ | |
for r in re.search(get_selects, select).groups()[0].replace('\n', ' ')\ | |
.split('?') if r.strip() != '' | |
] | |
return (select, field_list) | |
bindings = { | |
'subject': '<http://vivo.school.edu/individual/jsmith>', | |
'property': 'core:authorInAuthorship', | |
} | |
g = Graph() | |
list_view_xml = pre_process_listview(sys.argv[1]) | |
root = ET.fromstring(list_view_xml) | |
for construct_query in root.findall('query-construct'): | |
query = construct_query.text\ | |
.replace('?subject', bindings['subject'])\ | |
.replace('?property', bindings['property']) | |
logger.debug('SPARQL:\n%s' % query) | |
sparql.setQuery(query) | |
results = sparql.queryAndConvert() | |
g.parse(data=results, format='n3') | |
select_query, field_list = process_listview_select(root) | |
logger.debug('FINAL SPARQL SELECT:\n%s' % select_query) | |
results_list = [] | |
for row in g.query(select_query): | |
pretty_dict = dict(zip(field_list, row)) | |
results_list.append(pretty_dict) | |
for row in results_list: | |
pprint(row) | |
sparql.logout() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
A subclass of SPARQLWrapper that will work with the built-in | |
SPARQL admin interface that is part of the VIVO web application. | |
At the moment, will only return RS_JSON for SELECT and | |
N3 for CONSTRUCT queries. | |
Requires | |
- SPARQLWrapper | |
- requests - for maintaining sessions. | |
""" | |
from pprint import pprint | |
import urllib | |
import urllib2 | |
from rdflib import Graph | |
import requests | |
from SPARQLWrapper import SPARQLWrapper, JSON, N3 | |
from SPARQLWrapper.SPARQLExceptions import QueryBadFormed, EndPointNotFound, EndPointInternalError | |
from SPARQLWrapper.Wrapper import _SPARQL_JSON | |
#VIVO returns its RS_JSON as application/javascript | |
_SPARQL_JSON.append('application/javascript') | |
class VIVOSparql(SPARQLWrapper): | |
""" | |
Extension of SPARQLWrapper to work with the built-in VIVO | |
SPARQL query interface. Eliminates the need to use Fuseki | |
for SPARQL non-update queries. | |
""" | |
def __init__(self,vivo_url, **kwargs): | |
self.session = requests.session() | |
self.vivo_url = vivo_url | |
#Add the VIVO SPARQL end point path to the VIVO url. | |
self.endpoint = vivo_url + 'admin/sparqlquery' | |
SPARQLWrapper.__init__(self, self.endpoint, kwargs) | |
#The resultFormat and rdfResultFormat are required | |
#parameters for the VIVO SPARQL interface. | |
#These are set to RS_JSON for SELECT and | |
#N3 for CONSTRUCT. These can be overridden when | |
#called but have not been tested. | |
self.addCustomParameter('resultFormat', 'RS_JSON') | |
self.addCustomParameter('rdfResultFormat', 'N3') | |
def login(self): | |
payload = { | |
'loginName': self.user, | |
'loginPassword': self.passwd, | |
'loginForm': 'Log in' | |
} | |
r = self.session.post(self.vivo_url + 'authenticate', | |
data=payload, | |
verify=False) | |
self.cookies = urllib.urlencode(self.session.cookies) | |
def logout(self): | |
resp = self.session.get(self.vivo_url + 'logout') | |
#Check response history for logout. | |
logout_resp = resp.history[0] | |
if logout_resp.status_code == 302: | |
return True | |
else: | |
raise Exception('Logout failed.') | |
def setQuery(self,query): | |
#Let's handle the response format here by looking | |
#at the response type. | |
if 'construct' in query.lower(): | |
self.setReturnFormat(N3) | |
elif 'DESCRIBE' in query: | |
self.setReturnFormat(N3) | |
else: | |
self.setReturnFormat(JSON) | |
SPARQLWrapper.setQuery(self, query) | |
def _query(self): | |
""" | |
Override _query method to use cookies acquired on login. | |
""" | |
request = self._createRequest() | |
try: | |
opener = urllib2.build_opener() | |
opener.addheaders.append(('Cookie', self.cookies)) | |
response = opener.open(request) | |
return (response, self.returnFormat) | |
except urllib2.HTTPError, e: | |
if e.code == 400: | |
raise QueryBadFormed() | |
elif e.code == 404: | |
raise EndPointNotFound() | |
elif e.code == 500: | |
raise EndPointInternalError(e.read()) | |
else: | |
raise e | |
return (None, self.returnFormat) | |
def results_graph(self): | |
""" | |
Shortcut for use with CONSTRUCT queries. Returns | |
results as an RDFLib graph. | |
""" | |
resp, rformat = self._query() | |
if rformat == 'N3': | |
rformat = 'n3' | |
g = Graph() | |
g.parse(resp, format=rformat) | |
return g |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment