-
-
Save adhorn/b84dc47175259992d406 to your computer and use it in GitHub Desktop.
Flask-SQLAlchemy - separating reads and writes - Tested on AWS Aurora
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask.ext.sqlalchemy import SQLAlchemy, get_state | |
import sqlalchemy.orm as orm | |
from functools import partial | |
from flask import current_app | |
class RoutingSession(orm.Session): | |
def __init__(self, db, autocommit=False, autoflush=False, **options): | |
self.app = db.get_app() | |
self.db = db | |
self._model_changes = {} | |
orm.Session.__init__( | |
self, autocommit=autocommit, autoflush=autoflush, | |
bind=db.engine, | |
binds=db.get_binds(self.app), **options) | |
def get_bind(self, mapper=None, clause=None): | |
try: | |
state = get_state(self.app) | |
except (AssertionError, AttributeError, TypeError) as err: | |
current_app.logger.info( | |
"cant get configuration. default bind. Error:" + err) | |
return orm.Session.get_bind(self, mapper, clause) | |
""" | |
If there are no binds configured, connect using the default | |
SQLALCHEMY_DATABASE_URI | |
""" | |
if state is None or not self.app.config['SQLALCHEMY_BINDS']: | |
if not self.app.debug: | |
current_app.logger.info("Connecting -> DEFAULT") | |
return orm.Session.get_bind(self, mapper, clause) | |
elif self._name: | |
self.app.logger.debug("Connecting -> {}".format(self._name)) | |
return state.db.get_engine(self.app, bind=self._name) | |
# Writes go to the master | |
elif self._flushing: # we who are about to write, salute you | |
current_app.logger.info("Connecting -> MASTER") | |
return state.db.get_engine(self.app, bind='master') | |
# Everything else goes to the slave | |
else: | |
current_app.logger.info("Connecting -> SLAVE") | |
return state.db.get_engine(self.app, bind='slave') | |
_name = None | |
def using_bind(self, name): | |
s = RoutingSession(self.db) | |
vars(s).update(vars(self)) | |
s._name = name | |
return s | |
class RouteSQLAlchemy(SQLAlchemy): | |
def __init__(self, *args, **kwargs): | |
SQLAlchemy.__init__(self, *args, **kwargs) | |
self.session.using_bind = lambda s: self.session().using_bind(s) | |
def create_scoped_session(self, options=None): | |
if options is None: | |
options = {} | |
scopefunc = options.pop('scopefunc', None) | |
return orm.scoped_session( | |
partial(RoutingSession, self, **options), scopefunc=scopefunc | |
) | |
db = RouteSQLAlchemy() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
SQLALCHEMY_BINDS = { | |
'master': 'mysql://foobardbdbuser:[email protected]/foobardb1', | |
'slave': 'mysql://foobardbdbuser:[email protected]/foobardb2' | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# the read/write bind interface fails to intercept update function | |
# to be executed on the master - now it is forced upon the master. | |
db.session.using_bind("master").query( | |
FooModel).filter(FooModel.id == id).update( | |
{ | |
FooModel.count: FooModel.count + 1, | |
FooModel.updated: _foobar_timestamp | |
} | |
) | |
db.session.commit() | |
db.session.flush() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment