Skip to content

Instantly share code, notes, and snippets.

@adhorn

adhorn/db.py Secret

Created October 9, 2015 11:55
Show Gist options
  • Save adhorn/b84dc47175259992d406 to your computer and use it in GitHub Desktop.
Save adhorn/b84dc47175259992d406 to your computer and use it in GitHub Desktop.
Flask-SQLAlchemy - separating reads and writes - Tested on AWS Aurora
from flask.ext.sqlalchemy import SQLAlchemy, get_state
import sqlalchemy.orm as orm
from functools import partial
from flask import current_app
class RoutingSession(orm.Session):
def __init__(self, db, autocommit=False, autoflush=False, **options):
self.app = db.get_app()
self.db = db
self._model_changes = {}
orm.Session.__init__(
self, autocommit=autocommit, autoflush=autoflush,
bind=db.engine,
binds=db.get_binds(self.app), **options)
def get_bind(self, mapper=None, clause=None):
try:
state = get_state(self.app)
except (AssertionError, AttributeError, TypeError) as err:
current_app.logger.info(
"cant get configuration. default bind. Error:" + err)
return orm.Session.get_bind(self, mapper, clause)
"""
If there are no binds configured, connect using the default
SQLALCHEMY_DATABASE_URI
"""
if state is None or not self.app.config['SQLALCHEMY_BINDS']:
if not self.app.debug:
current_app.logger.info("Connecting -> DEFAULT")
return orm.Session.get_bind(self, mapper, clause)
elif self._name:
self.app.logger.debug("Connecting -> {}".format(self._name))
return state.db.get_engine(self.app, bind=self._name)
# Writes go to the master
elif self._flushing: # we who are about to write, salute you
current_app.logger.info("Connecting -> MASTER")
return state.db.get_engine(self.app, bind='master')
# Everything else goes to the slave
else:
current_app.logger.info("Connecting -> SLAVE")
return state.db.get_engine(self.app, bind='slave')
_name = None
def using_bind(self, name):
s = RoutingSession(self.db)
vars(s).update(vars(self))
s._name = name
return s
class RouteSQLAlchemy(SQLAlchemy):
def __init__(self, *args, **kwargs):
SQLAlchemy.__init__(self, *args, **kwargs)
self.session.using_bind = lambda s: self.session().using_bind(s)
def create_scoped_session(self, options=None):
if options is None:
options = {}
scopefunc = options.pop('scopefunc', None)
return orm.scoped_session(
partial(RoutingSession, self, **options), scopefunc=scopefunc
)
db = RouteSQLAlchemy()
SQLALCHEMY_BINDS = {
'master': 'mysql://foobardbdbuser:[email protected]/foobardb1',
'slave': 'mysql://foobardbdbuser:[email protected]/foobardb2'
}
# the read/write bind interface fails to intercept update function
# to be executed on the master - now it is forced upon the master.
db.session.using_bind("master").query(
FooModel).filter(FooModel.id == id).update(
{
FooModel.count: FooModel.count + 1,
FooModel.updated: _foobar_timestamp
}
)
db.session.commit()
db.session.flush()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment