Created
February 20, 2014 16:33
-
-
Save pebbie/9117809 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
author: Peb Ruswono Aryan | |
date: 20.02.2014 | |
Simple Entity Resolution | |
read a text file (news) and resolve entities mentioned in the text | |
uses: | |
- external Part-of-Speech Tagger API (REST) | |
- Entity knowledge base over SPARQL with Regex filter | |
""" | |
import json | |
import requests | |
import sys | |
def levenshtein(a,b): | |
"""Calculates the Levenshtein distance between a and b. | |
http://hetland.org/coding/python/levenshtein.py""" | |
n, m = len(a), len(b) | |
if n > m: | |
# Make sure n <= m, to use O(min(n,m)) space | |
a,b = b,a | |
n,m = m,n | |
current = range(n+1) | |
for i in range(1,m+1): | |
previous, current = current, [i]+[0]*n | |
for j in range(1,n+1): | |
add, delete = previous[j]+1, current[j-1]+1 | |
change = previous[j-1] | |
if a[j-1] != b[i-1]: | |
change = change + 1 | |
current[j] = min(add, delete, change) | |
return current[n] | |
def run_sparql(host, query, format="application/json", filename=""): | |
params={ | |
"query": query, | |
"debug": "on", | |
"timeout": "", | |
"format": format, | |
} | |
r = requests.post(host, params=params) | |
if r.status_code==requests.codes.ok: | |
return r.json() | |
else: | |
raise Exception("SPARQL Error") | |
def remote_tag(url, txt): | |
params={ | |
"teks": txt, | |
"task": "postag" | |
} | |
r = requests.post(url, data=params) | |
if r.status_code==requests.codes.ok: | |
return r.text.split("\n") | |
else: | |
print r.status_code | |
return r.content | |
if __name__ == "__main__": | |
NLP = "http://nlp.pebbie.net" | |
ENDPOINT = "http://id.dbpedia.org/sparql" | |
if len(sys.argv)>1: | |
#read from file | |
with open(sys.argv[1]) as f: txt = f.read() | |
#use web service to do pos tagging | |
sent = remote_tag(NLP+'/handler', txt) | |
#extract nouns & proper nouns | |
c_entities = [] | |
buffer = [] | |
stat = "non_ent" | |
for s in sent: | |
tmp = [tuple(term.split("/")) for term in s.split(" ")] | |
for lex, tag in tmp: | |
if stat == "non_ent": | |
if tag in ["NN", "NNP"]: | |
buffer.append(lex) | |
stat = "ent" | |
elif stat == "ent": | |
if tag in ["NN", "NNP"]: | |
buffer.append(lex) | |
else: | |
chunk = " ".join(buffer) | |
if chunk not in c_entities: | |
c_entities.append(chunk) | |
buffer = [] | |
stat = "non_ent" | |
first_capital = lambda s: len(s)>1 and s[0]==s[0].upper() | |
#first filtering based on capitalized first character | |
cap_filter = True | |
if cap_filter: | |
output = [] | |
for ent in c_entities: | |
if any([first_capital(w) for w in ent.split()]): | |
output.append(ent) | |
c_entities = output | |
#print c_entities | |
#second filtering based on subword split ["A B C D", "C", "C D"] -> ["A B", "C D", "C"] | |
sub_filter = True | |
if sub_filter: | |
output = [] | |
is_sub = {} | |
for ent in c_entities: | |
sub = [e for e in c_entities if e != ent and ent in e] | |
if len(sub)==0 and ent not in output: | |
output.append(ent) | |
else: | |
is_sub[ent] = sub | |
for ent, sub in is_sub.items(): | |
for esub in sub: | |
if esub in output: | |
output.remove(esub) | |
epos = esub.index(ent) | |
first = esub[:epos] | |
rest = esub[epos:] | |
if first not in output: | |
if first_capital(first) or not cap_filter: | |
output.append(first) | |
if rest not in output: | |
if first_capital(rest) or not cap_filter: | |
output.append(rest) | |
c_entities = output | |
#print c_entities | |
#resolve to knowledge base via SPARQL (e.g. dbpedia) | |
output = {} | |
for entity in c_entities: | |
try: | |
result = run_sparql(ENDPOINT, """Select distinct ?ent, ?lbl where { ?ent rdfs:label ?lbl. filter(regex(?lbl, "%s", "i"))} LIMIT 10""" % entity) | |
candidate = [] | |
print "resolving \"%s\"..." % entity | |
#print result["results"]["bindings"] | |
for cand in result["results"]["bindings"]: | |
cand_pair = (cand["ent"]["value"], cand["lbl"]["value"]) | |
if cand_pair not in candidate: candidate.append(cand_pair) | |
if len(candidate)>0: | |
output[entity] = candidate | |
finally: | |
pass | |
c_entities = output | |
#choose best match using string matching (e.g. levenshtein) | |
output = {} | |
for entity, candidates in c_entities.items(): | |
ename = entity.lower().replace("_","").replace(" ","") | |
#prepare candidates | |
tmp = {} | |
for iri, label in candidates: | |
kname = label.lower().replace("_","").replace(" ","") | |
if kname not in tmp: | |
tmp[kname] = [iri] | |
else: | |
tmp[kname].append(iri) | |
if len(tmp.keys())==1: | |
#accept if there's only one alternative | |
output[entity] = tmp.values()[0] | |
else: | |
#sort ascending by distance | |
sorted_candidates = sorted([(kname, levenshtein(kname, ename)) for kname in tmp.keys()], key=lambda x:x[1]) | |
#accept if computed distance is less than original string length | |
if sorted_candidates[0][1] <= len(entity): | |
output[entity] = tmp[sorted_candidates[0][0]] | |
c_entities = output | |
print json.dumps(c_entities, indent=2) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment