Last active
December 11, 2015 08:58
-
-
Save matthewstory/4576817 to your computer and use it in GitHub Desktop.
An example of how to coordinate writes from many forks to a single xapian database using a file lock to serialize writes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fcntl | |
import time | |
import os | |
import sys | |
import shutil | |
import errno | |
import xapian as _x | |
# list of children to wait on | |
_pids = [] | |
# database directory | |
_DB = "./test.db" | |
# number of forks | |
_FORKS = 10 | |
# Make sure our test is clean | |
try: | |
shutil.rmtree(_DB) | |
except OSError, e: | |
if e.errno != errno.ENOENT: | |
raise e | |
os.mkdir(_DB) | |
def _safe_db_do(db, attr, *args, **kwargs): | |
'''As the database may be modified at any time, we need to reopen, and | |
trap DatabaseModifiedError for each operation, and retry until | |
success''' | |
while True: | |
db.reopen() | |
try: | |
return getattr(db, attr)(*args, **kwargs) | |
except _x.DatabaseModifiedError: | |
pass | |
def _fork_term(fork_num): | |
'''Wrapper to create our fork-specific term''' | |
return "".join([ "XHELO", str(fork_num), ]) | |
def _writable(): | |
'''Blocking wait on lock, then yield the writable DB''' | |
# open the lock-file, in the DB dir to flock against | |
lockfd = os.open(os.path.join(_DB, ".writelock"), os.O_CREAT, 0666) | |
try: | |
fcntl.flock(lockfd, fcntl.LOCK_EX) | |
return lockfd, _x.WritableDatabase(_DB, _x.DB_CREATE_OR_OPEN) | |
except Exception, e: | |
os.close(lockfd) | |
raise e | |
for i in range(_FORKS): | |
pid = os.fork() | |
# child fork | |
if 0 == pid: | |
lockfd, writable = _writable() | |
try: | |
print "lock acquired by: %s" % os.getpid() | |
readable = _x.Database(_DB) | |
try: | |
# sleep here only to exagerate the serialization | |
time.sleep(0.5) | |
doc = _x.Document() | |
term = _fork_term(i+2) | |
doc.add_term(term) | |
# add something new, and modify something contended | |
writable.replace_document(i+2, doc) | |
writable.replace_document(1, doc) | |
# you must commit, or your changes may not be picked up by | |
# other forks, when many are reading | |
writable.commit() | |
writable.close() | |
for t in _safe_db_do(readable, 'get_document', i+2).termlist(): | |
print "new: %s; should be: %s" % (t.term, term) | |
for t in _safe_db_do(readable, 'get_document', 1).termlist(): | |
print "contested: %s; should be: %s" % (t.term, term) | |
finally: | |
readable.close() | |
finally: | |
os.close(lockfd) | |
writable.close() | |
# cleanup -- Exit with the i number, to determine (cheaply) which | |
# fork exited last | |
os._exit(i) | |
else: | |
_pids.append(pid) | |
# wait for the forks to cycle out | |
last_exit = 0 | |
while len(_pids): | |
pid, last_exit = os.waitpid(-1, 0) | |
if not os.WIFEXITED(last_exit): | |
print >> sys.stderr, "abnormal termination of fork: %s" % pid | |
_pids.remove(pid) | |
last_exit = os.WEXITSTATUS(last_exit) | |
# demonstrate the contested value matches the last exiting fork | |
readable = _x.Database(_DB) | |
for i in range(_FORKS): | |
for t in _safe_db_do(readable, 'get_document', i+2).termlist(): | |
print "uncontested: %s; should be: %s" % (t.term, _fork_term(i+2)) | |
for t in _safe_db_do(readable, 'get_document', 1).termlist(): | |
print "contested: %s; should be: %s" % (t.term, _fork_term(last_exit+2)) | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment