Created
May 26, 2011 21:13
-
-
Save saghul/994096 to your computer and use it in GitHub Desktop.
Serialize SQLite operations with SQLObject and Twisted
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf8 | |
# Copyright (C) 2011 Saúl Ibarra Corretgé <[email protected]> | |
# | |
__all__ = ['Database', 'DatabaseError'] | |
from threading import Thread | |
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol | |
from twisted.internet import reactor | |
from twisted.internet.threads import deferToThreadPool | |
from twisted.python.threadpool import ThreadPool | |
pool = ThreadPool(minthreads=1, maxthreads=1, name='db-ops') | |
pool.start() | |
reactor.addSystemEventTrigger('before', 'shutdown', pool.stop) | |
def run_in_db_thread(func): | |
"""Decorator to run DB queries in Twisted's thread pool""" | |
def wrapper(*args, **kw): | |
return deferToThreadPool(reactor, pool, func, *args, **kw) | |
return wrapper | |
class Users(SQLObject): | |
nickname = StringCol() | |
full_name = StringCol() | |
email = StringCol() | |
class DatabaseError(Exception): pass | |
class Database(object): | |
def __init__(self, dburi=None): | |
self._uri = dburi or 'sqlite:/:memory:' | |
self.initialize() | |
def _create_table(self, klass): | |
if klass.tableExists(): | |
return | |
else: | |
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table | |
saved = klass._connection.debug | |
try: | |
klass._connection.debug = True | |
klass.createTable() | |
finally: | |
klass._connection.debug = saved | |
@run_in_db_thread | |
def initialize(self): | |
try: | |
conn = connectionForURI(self._uri) | |
sqlhub.processConnection = conn | |
except Exception, e: | |
print 'Error connection with the DB: %s' % e | |
self.connected = False | |
return | |
else: | |
self.connected = True | |
for klass in [Users]: # We'd initialize all SQLObjects here | |
self._create_table(klass) | |
@run_in_db_thread | |
def create_user(self, nickname, fullname, email): | |
return Users(nickname=nickname, full_name=fullname, email=email) | |
@run_in_db_thread | |
def get_user_data(self, nickname): | |
try: | |
user = Users.selectBy(nickname=nickname)[0] | |
except IndexError: | |
raise DatabaseError("User %s doesn't exist" % nickname) | |
else: | |
return user | |
def main(): | |
def got_result(user): | |
print 'User info:' | |
print '\tNickname: %s' % user.nickname | |
print '\tFull name: %s' % user.full_name | |
print '\tEmail address: %s' % user.email | |
def got_error(error): | |
print 'Got error! %s' % error.getErrorMessage() | |
db = Database() | |
db.create_user('saghul', 'saghul', '[email protected]') | |
db.create_user('saghul2', 'saghul2', '[email protected]') | |
db.create_user('saghu3', 'saghul3', '[email protected]') | |
db.create_user('saghul4', 'saghul4', '[email protected]') | |
d = db.get_user_data('saghul') | |
d.addCallbacks(got_result, got_error) | |
def run_reactor(): | |
reactor.run(installSignalHandlers=False) | |
def reactor_stop(): | |
from time import sleep | |
sleep(3) | |
reactor.callFromThread(reactor.stop) | |
if __name__ == '__main__': | |
Thread(target=run_reactor).start() | |
main() | |
reactor_stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment