Created
March 30, 2012 13:52
-
-
Save mhausenblas/2251694 to your computer and use it in GitHub Desktop.
Simple RDF store in ZooKeeper
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zookeeper, threading, sys | |
ZOO_OPEN_ACL_UNSAFE = {"perms":0x1f, "scheme":"world", "id" :"anyone"}; | |
class ZKSimpleRDFStore(object): | |
SERVER_PORT = 2181 | |
def __init__(self): | |
self.host = "localhost:%d" % self.SERVER_PORT | |
self.connected = False | |
self.handle = -1 | |
try: | |
f = open('simple-rdf-store.log','w') | |
zookeeper.set_log_stream(f) | |
except IOError: | |
print "Couldn't open logfile for writing" | |
def set_up(self): | |
self.callback_flag = False | |
self.cv = threading.Condition() | |
self.connected = False | |
def connection_watcher(handle, type, state, path): | |
self.cv.acquire() | |
self.connected = True | |
self.cv.notify() | |
self.cv.release() | |
self.cv.acquire() | |
self.handle = zookeeper.init(self.host, connection_watcher) | |
self.cv.wait(10.0) | |
self.cv.release() | |
if not self.connected: | |
raise Exception("Couldn't connect to host -", self.host) | |
def new_con(self): | |
cv = threading.Condition() | |
self.pending_connection = False | |
def connection_watcher(handle, type, state, path): | |
cv.acquire() | |
self.pending_connection = True | |
cv.notify() | |
cv.release() | |
cv.acquire() | |
handle = zookeeper.init(self.host, connection_watcher) | |
cv.wait(15.0) | |
cv.release() | |
if not self.pending_connection: | |
raise Exception("Couldn't connect to host -", self.host) | |
return handle | |
def shut_down(self): | |
if self.connected: | |
zookeeper.close(self.handle) | |
def exists_ng(self, ng): | |
return zookeeper.exists(self.handle, ng, None) | |
def get_ng(self, ng): | |
return zookeeper.get(self.handle, ng, None) | |
# adds a named graph and overwrites the content in case it exists already | |
def put_ng(self, ng, val): | |
ret = None | |
if self.exists_ng(ng): | |
(data, stat) = self.get_ng(ng) | |
zookeeper.set(self.handle, ng, val, stat["version"]) | |
else: | |
zookeeper.create(self.handle, ng, val, [ZOO_OPEN_ACL_UNSAFE], zookeeper.EPHEMERAL) | |
return self.get_ng(ng) | |
if __name__ == '__main__': | |
zkSRS = ZKSimpleRDFStore() | |
print 'Set up of Simple RDF Store ...' | |
zkSRS.set_up() | |
(data, stat) = zkSRS.put_ng('/ng-0', '<http://data.example.org/person/tim> dc:publisher "Tim" .') | |
print stat | |
print data | |
print "Done" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment