Created
February 18, 2010 07:53
-
-
Save rcampbell/307455 to your computer and use it in GitHub Desktop.
Querying OpenCalais RDF models and MapReducing the results
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
(ns calais.rdf | |
(:use [clojure.http.client] | |
[clojure.contrib.str-utils :only [re-gsub]] | |
[clojure.contrib.seq-utils :only [frequencies flatten partition-all]]) | |
(:require [clojure.contrib.str-utils2 :as str-utils]) | |
(:import [java.io File FileInputStream] | |
[com.hp.hpl.jena.rdf.model Model ModelFactory] | |
[com.hp.hpl.jena.query QueryExecutionFactory QueryFactory])) | |
(defn load-model [file] | |
"Loads a model from the file system" | |
(.read (ModelFactory/createDefaultModel) | |
(FileInputStream. file) nil)) | |
(defn- load-models [dir] | |
(map load-model (.listFiles dir))) | |
(defn- ask [{:keys [query handler]} model] | |
"Queries model and processes the result set with handler" | |
(with-open [executioner (QueryExecutionFactory/create | |
(QueryFactory/create query) model)] | |
(map handler (doall (iterator-seq (.execSelect executioner)))))) | |
(defn literals [& fields] | |
"Creates a handler to extract literal values from the solution" | |
(fn [solution] | |
(zipmap (map #(keyword %) fields) | |
(map #(str (.getLiteral solution %)) fields)))) | |
(def prefixes (str "PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#> " | |
"PREFIX c: <http://s.opencalais.com/1/pred/> ")) | |
(def queries {:companies {:query (str prefixes | |
"SELECT ?company ?docId ?ticker ?name ?score " | |
"WHERE {" | |
" ?company rdf:type <http://s.opencalais.com/1/type/er/Company> . " | |
" ?company c:docId ?docId . " | |
" ?company c:ticker ?ticker . " | |
" ?company c:name ?name . " | |
" ?company c:score ?score . " | |
" FILTER (?ticker != \"IDCHS\") " | |
" }") | |
:handler (literals "name" "ticker" "score")} | |
:categories {:query (str prefixes | |
"SELECT ?name ?score " | |
"WHERE {" | |
" ?category rdf:type <http://s.opencalais.com/1/type/cat/DocCat> . " | |
" ?category c:categoryName ?name . " | |
" ?category c:score ?score " | |
" }") | |
:handler (literals "name" "score")} | |
:technologies {:query (str prefixes | |
"SELECT ?name ?relevance " | |
"WHERE {" | |
" ?technology rdf:type <http://s.opencalais.com/1/type/em/e/Technology> . " | |
" ?technology c:name ?name . " | |
" ?relevance_info rdf:type <http://s.opencalais.com/1/type/sys/RelevanceInfo> . " | |
" ?relevance_info c:subject ?technology . " | |
" ?relevance_info c:relevance ?relevance " | |
" }") | |
:handler (literals "name" "relevance")} | |
:industry-terms {:query (str prefixes | |
"SELECT ?name ?relevance " | |
"WHERE {" | |
" ?industry_term rdf:type <http://s.opencalais.com/1/type/em/e/IndustryTerm> . " | |
" ?industry_term c:name ?name . " | |
" ?relevance_info rdf:type <http://s.opencalais.com/1/type/sys/RelevanceInfo> . " | |
" ?relevance_info c:subject ?industry_term . " | |
" ?relevance_info c:relevance ?relevance " | |
" }") | |
:handler (literals "name" "relevance")} | |
:people {:query (str prefixes | |
"SELECT ?name ?relevance " | |
"WHERE {" | |
" ?person rdf:type <http://s.opencalais.com/1/type/em/e/Person> . " | |
" ?person c:name ?name . " | |
" ?relevance_info rdf:type <http://s.opencalais.com/1/type/sys/RelevanceInfo> . " | |
" ?relevance_info c:subject ?person . " | |
" ?relevance_info c:relevance ?relevance " | |
" }") | |
:handler (literals "name" "relevance")} | |
:products {:query (str prefixes | |
"SELECT ?name ?relevance " | |
"WHERE {" | |
" ?product rdf:type <http://s.opencalais.com/1/type/em/e/Product> . " | |
" ?product c:name ?name . " | |
" ?relevance_info rdf:type <http://s.opencalais.com/1/type/sys/RelevanceInfo> . " | |
" ?relevance_info c:subject ?product . " | |
" ?relevance_info c:relevance ?relevance " | |
" }") | |
:handler (literals "name" "relevance")}}) | |
(defn ask-all [file] | |
"Runs all defined queries on the given file" | |
(try | |
(zipmap (keys queries) (map #(ask (second %) (load-model file)) queries)) | |
(catch Exception e (.printStackTrace e) {}))) | |
; --- tag reduction --- | |
(def tags (ref {})) | |
(def freq (ref {})) ; {:companies {"IBM" 2 "Apple" 5}} | |
(def xref (ref {})) ; {"IBM" ["file2" "file6" "file42"]} | |
(defn project-name [m] | |
"Projects only the tag name" | |
(zipmap (keys m) (map #(map :name %) (vals m)))) | |
(defn tag-frequencies [m] | |
"Creates a map where keys are the tags and values are | |
the number of occurances of this tag found" | |
(zipmap (keys m) (map frequencies (vals m)))) | |
(defn add-tag [model] | |
(dosync (alter tags #(merge-with concat %1 %2) model))) | |
(defn add-freq [model] | |
"Associates a tag name with its frequency" | |
(dosync (alter freq | |
(fn [freq-m model-m] | |
(merge-with #(merge-with + %1 %2) freq-m model-m)) | |
(tag-frequencies model)))) | |
(defn add-xref [id model] | |
"Associates a tag name with a document" | |
(doseq [tag (map #(re-gsub #"\." "" %) (flatten (vals model)))] | |
(dosync (alter xref assoc tag | |
(if-let [ids (@xref tag)] | |
(conj ids id) | |
#{id}))))) | |
(defn reduce-tags [files] | |
"Computes the various aggregate data structures" | |
(let [parse-name (comp project-name ask-all)] | |
(doseq [file files] | |
(let [id (str-utils/butlast (.getName file) 4) | |
model (parse-name file)] | |
(add-tag model) | |
(add-freq model) | |
(add-xref id model))))) | |
(defn parallelize-reduction [dir] | |
"Partitions the file set by the number of available | |
processors and aggregates their results" | |
(let [files (.listFiles dir)] | |
(dorun (pmap reduce-tags | |
(partition-all (int (/ (count files) | |
(.availableProcessors (Runtime/getRuntime)))) files))))) | |
; --- tag filtering and sorting --- | |
(def default-rules [#(> (val %) 3) | |
#(not= (key %) "Web rights")]) | |
(defn filter-with | |
"Filters a map given a seq of intersected rules" | |
([m] (filter-with default-rules m)) | |
([rules m] (filter (fn [x] (reduce #(and %1 (%2 x)) true rules)) m))) | |
(defn sorted-by-val [m] | |
"Returns a seq of pairs sorted by the original MapEntry value" | |
(apply sorted-set-by #(compare (vec (reverse %2)) (vec (reverse %1))) m)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment