Skip to content

Instantly share code, notes, and snippets.

@lawlesst
Last active August 11, 2017 17:39
Show Gist options
  • Save lawlesst/5192700 to your computer and use it in GitHub Desktop.
Save lawlesst/5192700 to your computer and use it in GitHub Desktop.
A Python script to parse VIVO list view config XML files and issue SPARQL queries contained within them.
"""
Utility script to parse VIVO listViewConfig.xml files.
Requires
- rdflib_sparql
- SPARQLWrapper
Run as
$ python generate_listview.py /path/to/listViewConfig-awardOrHonor.xml
"""
import os
from pprint import pprint
import re
import sys
import xml.etree.ElementTree as ET
import rdflib
from rdflib import Graph
from rdflib_sparql.processor import prepareQuery
from SPARQLWrapper import SPARQLWrapper, JSON, N3
#logging
#http://dancingpenguinsoflight.com/2009/03/simple-and-effective-python-logging/
import logging
logger = logging.getLogger(sys.argv[0])
logger.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s - %(name)s - %(lineno)s - \
%(levelname)s - %(message)s")
conlog = logging.StreamHandler()
conlog.setFormatter(formatter)
logger.addHandler(conlog)
#Setup VIVO's SPARQL endpoint
from utility_scripts.vivo_sparql import VIVOSparql
sparql = VIVOSparql(os.getenv('VIVO_URL'))
sparql.setCredentials(os.getenv('VIVO_USER'), os.getenv('VIVO_PASS'))
sparql.login()
def pre_process_listview(listview_file):
with open(sys.argv[1]) as raw_file:
raw = raw_file.read()
list_view_xml = raw.replace('<collated>', '')\
.replace('</collated>', '')\
.replace('<critical-data-required>', '')\
.replace('</critical-data-required>', '')
return list_view_xml
def process_listview_select(tree):
select = tree.findall('query-select')[0].text
#Pull out the fields from the select queries and use
#to put results in a list of dictionaries that's easier to read.
get_selects = re.compile("SELECT DISTINCT\s+(\?.*) WHERE \{", re.DOTALL)
field_list = [\
r.strip().replace('\n', '') \
for r in re.search(get_selects, select).groups()[0].replace('\n', ' ')\
.split('?') if r.strip() != ''
]
return (select, field_list)
bindings = {
'subject': '<http://vivo.school.edu/individual/jsmith>',
'property': 'core:authorInAuthorship',
}
g = Graph()
list_view_xml = pre_process_listview(sys.argv[1])
root = ET.fromstring(list_view_xml)
for construct_query in root.findall('query-construct'):
query = construct_query.text\
.replace('?subject', bindings['subject'])\
.replace('?property', bindings['property'])
logger.debug('SPARQL:\n%s' % query)
sparql.setQuery(query)
results = sparql.queryAndConvert()
g.parse(data=results, format='n3')
select_query, field_list = process_listview_select(root)
logger.debug('FINAL SPARQL SELECT:\n%s' % select_query)
results_list = []
for row in g.query(select_query):
pretty_dict = dict(zip(field_list, row))
results_list.append(pretty_dict)
for row in results_list:
pprint(row)
print
sparql.logout()
"""
A subclass of SPARQLWrapper that will work with the built-in
SPARQL admin interface that is part of the VIVO web application.
At the moment, will only return RS_JSON for SELECT and
N3 for CONSTRUCT queries.
Requires
- SPARQLWrapper
- requests - for maintaining sessions.
"""
from pprint import pprint
import urllib
import urllib2
from rdflib import Graph
import requests
from SPARQLWrapper import SPARQLWrapper, JSON, N3
from SPARQLWrapper.SPARQLExceptions import QueryBadFormed, EndPointNotFound, EndPointInternalError
from SPARQLWrapper.Wrapper import _SPARQL_JSON
#VIVO returns its RS_JSON as application/javascript
_SPARQL_JSON.append('application/javascript')
class VIVOSparql(SPARQLWrapper):
"""
Extension of SPARQLWrapper to work with the built-in VIVO
SPARQL query interface. Eliminates the need to use Fuseki
for SPARQL non-update queries.
"""
def __init__(self,vivo_url, **kwargs):
self.session = requests.session()
self.vivo_url = vivo_url
#Add the VIVO SPARQL end point path to the VIVO url.
self.endpoint = vivo_url + 'admin/sparqlquery'
SPARQLWrapper.__init__(self, self.endpoint, kwargs)
#The resultFormat and rdfResultFormat are required
#parameters for the VIVO SPARQL interface.
#These are set to RS_JSON for SELECT and
#N3 for CONSTRUCT. These can be overridden when
#called but have not been tested.
self.addCustomParameter('resultFormat', 'RS_JSON')
self.addCustomParameter('rdfResultFormat', 'N3')
def login(self):
payload = {
'loginName': self.user,
'loginPassword': self.passwd,
'loginForm': 'Log in'
}
r = self.session.post(self.vivo_url + 'authenticate',
data=payload,
verify=False)
self.cookies = urllib.urlencode(self.session.cookies)
def logout(self):
resp = self.session.get(self.vivo_url + 'logout')
#Check response history for logout.
logout_resp = resp.history[0]
if logout_resp.status_code == 302:
return True
else:
raise Exception('Logout failed.')
def setQuery(self,query):
#Let's handle the response format here by looking
#at the response type.
if 'construct' in query.lower():
self.setReturnFormat(N3)
elif 'DESCRIBE' in query:
self.setReturnFormat(N3)
else:
self.setReturnFormat(JSON)
SPARQLWrapper.setQuery(self, query)
def _query(self):
"""
Override _query method to use cookies acquired on login.
"""
request = self._createRequest()
try:
opener = urllib2.build_opener()
opener.addheaders.append(('Cookie', self.cookies))
response = opener.open(request)
return (response, self.returnFormat)
except urllib2.HTTPError, e:
if e.code == 400:
raise QueryBadFormed()
elif e.code == 404:
raise EndPointNotFound()
elif e.code == 500:
raise EndPointInternalError(e.read())
else:
raise e
return (None, self.returnFormat)
def results_graph(self):
"""
Shortcut for use with CONSTRUCT queries. Returns
results as an RDFLib graph.
"""
resp, rformat = self._query()
if rformat == 'N3':
rformat = 'n3'
g = Graph()
g.parse(resp, format=rformat)
return g
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment