Created
January 24, 2011 21:20
-
-
Save saghul/793982 to your computer and use it in GitHub Desktop.
Twisted + SQLObject example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# coding=utf8 | |
# Copyright (C) 2011 Saúl Ibarra Corretgé <[email protected]> | |
# | |
__all__ = ['Database', 'DatabaseError'] | |
from sqlobject import connectionForURI, sqlhub, SQLObject, StringCol | |
from twisted.internet import reactor | |
from twisted.internet.threads import deferToThread | |
def defer_to_thread(func): | |
"""Decorator to run DB queries in Twisted's thread pool""" | |
def wrapper(*args, **kw): | |
return deferToThread(func, *args, **kw) | |
return wrapper | |
class Users(SQLObject): | |
nickname = StringCol() | |
full_name = StringCol() | |
email = StringCol() | |
class DatabaseError(Exception): pass | |
class Database(object): | |
def __init__(self, dburi): | |
if ':memory:' in dburi: | |
print 'SQLite in-memory DB is not supported' | |
dburi = None | |
self._uri = dburi | |
if self._uri is not None: | |
try: | |
conn = connectionForURI(self._uri) | |
sqlhub.processConnection = conn | |
except Exception, e: | |
print 'Error connection with the DB: %s' % e | |
self.connected = False | |
else: | |
self.connected = True | |
else: | |
self.connected = True | |
def _create_table(self, klass): | |
if klass.tableExists(): | |
return | |
else: | |
print 'Table %s does not exists. Creating it now.' % klass.sqlmeta.table | |
saved = klass._connection.debug | |
try: | |
klass._connection.debug = True | |
klass.createTable() | |
finally: | |
klass._connection.debug = saved | |
def initialize(self): | |
if self.connected: | |
for klass in [Users]: # We'd initialize all SQLObjects here | |
self._create_table(klass) | |
@defer_to_thread | |
def get_user_data(self, nickname): | |
try: | |
user = Users.selectBy(nickname=nickname)[0] | |
except IndexError: | |
raise DatabaseError("User %s doesn't exist" % nickname) | |
else: | |
return user | |
def main(): | |
def got_result(user): | |
print 'User info:' | |
print '\tNickname: %s' % user.nickname | |
print '\tFull name: %s' % user.full_name | |
print '\tEmail address: %s' % user.email | |
def got_error(error): | |
print 'Got error! %s' % error.getErrorMessage() | |
db = Database('sqlite:///tmp/test.sqlite') | |
db.initialize() | |
d = db.get_user_data('saghul') | |
d.addCallback(got_result) | |
d.addErrback(got_error) | |
if __name__ == '__main__': | |
reactor.callLater(0, main) | |
reactor.run() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment