Skip to content

Instantly share code, notes, and snippets.

@niwinz
Forked from FSX/async_psycopg2.py
Created April 19, 2012 20:53
Show Gist options
  • Save niwinz/2424112 to your computer and use it in GitHub Desktop.
Save niwinz/2424112 to your computer and use it in GitHub Desktop.
A module for asynchronous PostgreSQL queries in Tornado.
#!/usr/bin/env python
__author__ = 'Frank Smit <[email protected]>'
__version__ = '0.1.0'
import functools
import psycopg2
from tornado.ioloop import IOLoop, PeriodicCallback
class Pool(object):
"""A connection pool that manages PostgreSQL connections.
"""
def __init__(self, min_conn, max_conn, cleanup_timeout, *args, **kwargs):
self.min_conn = min_conn
self.max_conn = max_conn
self.closed = False
self._args = args
self._kwargs = kwargs
self._pool = []
for i in range(self.min_conn):
self._new_conn()
# Create a periodic callback that tries to close inactive connections
if cleanup_timeout > 0:
self._cleaner = PeriodicCallback(self._clean_pool,
cleanup_timeout * 1000)
self._cleaner.start()
def _new_conn(self, new_cursor_args={}):
"""Create a new connection. If `new_cursor_args` is provided a new
cursor is created when the callback is executed.
"""
if len(self._pool) > self.max_conn:
raise PoolError('connection pool exausted')
conn = psycopg2.connect(*self._args, **self._kwargs)
add_conn = functools.partial(self._add_conn, conn)
if new_cursor_args:
new_cursor_args['connection'] = conn
new_cursor = functools.partial(self._new_cursor, **new_cursor_args)
Poller(conn, (add_conn, new_cursor)).start()
else:
Poller(conn, (add_conn,)).start()
def _add_conn(self, conn):
"""Add a connection to the pool. This function is used by `_new_conn`
as a callback to add the created connection to the pool.
"""
self._pool.append(conn)
def _new_cursor(self, function, func_args=(), callback=None, connection=None):
"""Create a new cursor. If there's no connection available, a new
connection will be created and `_new_cursor` will be called again after
the connection has been made.
"""
if not connection:
connection = self._get_free_conn()
if not connection:
new_cursor_args = {
'function': function,
'func_args': func_args,
'callback': callback
}
self._new_conn(new_cursor_args)
return
cursor = connection.cursor()
getattr(cursor, function)(*func_args)
# Callbacks from cursor fucntion always get the cursor back
callback = functools.partial(callback, cursor)
Poller(cursor.connection, (callback,)).start()
def _get_free_conn(self):
"""Look for a free connection and return it. `None` is returned when no
free connection can be found.
"""
if self.closed:
raise PoolError('connection pool is closed')
for conn in self._pool:
if not conn.isexecuting():
return conn
return None
def _clean_pool(self):
"""Try to close the number of connections that exceeds the number in
`min_conn`. This method loops throught the connections in `_pool` and
if it finds a free connection it closes it.
"""
if self.closed:
raise PoolError('connection pool is closed')
if len(self._pool) > self.min_conn:
conns = len(self._pool) - self.min_conn
indexes = []
for i, conn in enumerate(self._pool):
if not conn.isexecuting():
conn.close()
conns = conns - 1
indexes.append(i)
if conns == 0:
break
for i in indexes:
self._pool.pop(i)
def execute(self, operation, parameters=(), callback=None):
"""http://initd.org/psycopg/docs/cursor.html#cursor.execute
"""
self._new_cursor('execute', (operation, parameters), callback)
def executemany(self, operation, parameters=None, callback=None):
"""http://initd.org/psycopg/docs/cursor.html#cursor.executemany
"""
self._new_cursor('executemany', (operation, parameters), callback)
def callproc(self, procname, parameters=None, callback=None):
"""http://initd.org/psycopg/docs/cursor.html#cursor.callproc
"""
self._new_cursor('callproc', (procname, parameters), callback)
def close(self):
"""Close all open connections.
"""
if self.closed:
raise PoolError('connection pool is closed')
for conn in self._pool:
if not conn.closed:
conn.close()
self._pool = []
self.closed = True
class Poller(object):
"""A poller that polls the PostgreSQL connection and calls the callbacks
when the connection state is `POLL_OK`.
"""
def __init__(self, connection, callbacks=()):
self._ioloop = IOLoop.instance()
self._connection = connection
self._callbacks = callbacks
def start(self):
"""Start polling the connection.
"""
self._update_handler()
def _update_handler(self):
state = self._connection.poll()
if state == psycopg2.extensions.POLL_OK:
for callback in self._callbacks:
callback()
elif state == psycopg2.extensions.POLL_READ:
self._ioloop.add_handler(self._connection.fileno(), self._io_callback, IOLoop.READ)
elif state == psycopg2.extensions.POLL_WRITE:
self._ioloop.add_handler(self._connection.fileno(), self._io_callback, IOLoop.WRITE)
def _io_callback(self, *args):
self._ioloop.remove_handler(self._connection.fileno())
self._update_handler()
class PoolError(Exception):
pass
#!/usr/bin/env python
from os import path
import tornado.options
import tornado.web
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.options import define, options
import psycopg2
from async_psycopg2 import Pool
define('port', default=8888, help='run on the given port', type=int)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r'/', MainHandler),
(r'/test1', Test1Handler),
(r'/test2', Test2Handler)
]
settings = dict(
template_path=path.join(path.dirname(__file__), 'templates'),
static_path=path.join(path.dirname(__file__), 'static'),
xsrf_cookies=True,
cookie_secret='dsfretghj867544wgryjuyki9p9lou67543/Vo=',
)
tornado.web.Application.__init__(self, handlers, **settings)
self.db = None
class BaseHandler(tornado.web.RequestHandler):
@property
def db(self):
if not self.application.db:
self.application.db = Pool(1, 20, 10, **{
'host': 'localhost',
'database': 'infunadb',
'user': 'infuna',
'password': 'password',
'async': 1
})
return self.application.db
class MainHandler(BaseHandler):
@tornado.web.asynchronous
def get(self):
self.db.execute('SELECT 42, 12, 40, 11;', callback=self._on_response)
def _on_response(self, cursor):
print 'Request', cursor.fetchall()
self.write('Hello, world')
self.finish()
class Test1Handler(BaseHandler):
@tornado.web.asynchronous
def get(self):
self.db.execute('SELECT pg_sleep(15); SELECT 5454, 324, 2343;',
callback=self._on_response)
def _on_response(self, cursor):
print 'Request', cursor.fetchall()
cursor.close()
self.write('Test 1')
self.finish()
class Test2Handler(BaseHandler):
@tornado.web.asynchronous
def get(self):
self.db.execute('SELECT pg_sleep(15); SELECT 636, 222, 123;',
callback=self._on_response)
def _on_response(self, cursor):
print 'Request', cursor.fetchall()
cursor.close()
self.write('Test 2')
self.finish()
def main():
try:
tornado.options.parse_command_line()
http_server = HTTPServer(Application())
http_server.bind(8888)
http_server.start(0) # Forks multiple sub-processes
IOLoop.instance().start()
except KeyboardInterrupt:
print 'Exit'
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment