-
-
Save carljm/57bfb8616f11bceaf865 to your computer and use it in GitHub Desktop.
""" | |
SQLAlchemy, PostgreSQL (psycopg2), and autocommit | |
See blog post: http://oddbird.net/2014/06/14/sqlalchemy-postgres-autocommit/ | |
""" | |
from contextlib import contextmanager | |
from sqlalchemy import create_engine, event | |
from sqlalchemy.orm import sessionmaker, Session as BaseSession | |
class Session(BaseSession): | |
def __init__(self, *a, **kw): | |
super(Session, self).__init__(*a, **kw) | |
self._in_atomic = False | |
@contextmanager | |
def atomic(self): | |
"""Transaction context manager. | |
Will commit the transaction on successful completion of the block, or | |
roll it back on error. | |
Supports nested usage (via savepoints). | |
""" | |
nested = self._in_atomic | |
self.begin(nested=nested) | |
self._in_atomic = True | |
try: | |
yield | |
except: | |
self.rollback() | |
raise | |
else: | |
self.commit() | |
finally: | |
if not nested: | |
self._in_atomic = False | |
class Database(object): | |
def __init__(self, db_uri): | |
self.engine = create_engine(db_uri, isolation_level="AUTOCOMMIT") | |
self.Session = sessionmaker(bind=self.engine, class_=Session, autocommit=True) | |
# Keep track of which DBAPI connection(s) had autocommit turned off for | |
# a particular transaction object. | |
dconns_by_trans = {} | |
@event.listens_for(self.Session, 'after_begin') | |
def receive_after_begin(session, transaction, connection): | |
"""When a (non-nested) transaction begins, turn autocommit off.""" | |
dbapi_connection = connection.connection.connection | |
if transaction.nested: | |
assert not dbapi_connection.autocommit | |
return | |
assert dbapi_connection.autocommit | |
dbapi_connection.autocommit = False | |
dconns_by_trans.setdefault(transaction, set()).add( | |
dbapi_connection) | |
@event.listens_for(self.Session, 'after_transaction_end') | |
def receive_after_transaction_end(session, transaction): | |
"""Restore autocommit anywhere this transaction turned it off.""" | |
if transaction in dconns_by_trans: | |
for dbapi_connection in dconns_by_trans[transaction]: | |
assert not dbapi_connection.autocommit | |
dbapi_connection.autocommit = True | |
del dconns_by_trans[transaction] |
Hey this is very interesting as I deal with a related problem. I'm executing concurrent inserts from a multi-process environment and I get IntegrityErrors because of unique key violations. Thought of using savepoints via atomic transactions so I'm trying to understand your solution but I also found this from the docs:
begin_nested()
, in the same manner as the less often usedbegin()
method, returns a transactional object which also works as a context manager. It can be succinctly used around individual record inserts in order to catch things like unique constraint exceptions:for record in records: try: with session.begin_nested(): session.merge(record) except: print "Skipped record %s" % record session.commit()
How is this different from your solution? Doesn't it work on PostgreSQL? Thanks anyway, I'll keep studying
@ducu begin_nested()
only creates a savepoint within your current transaction, not a full transaction. begin()
returns a SessionTransaction
object which can be used as a context manager, and it will error out if a transaction was already implicitly begun, so it's similar to my atomic
context manager above. The main difference is that with my code, the connection is in a true "autocommit" state in between transactions, so you can do one-shot writes or read-only queries without starting a transaction (and thus potentially avoid "idle in transaction" or the overhead of an extra round-trip for a COMMIT
that isn't actually committing anything.)
@carljm Thanks so much for the blog post that accompanied this – very helpful! Did anything ever come of this? Is this still your approach to transaction management, or is there a better solution now?
Nice research. But isn't it some sort of hacking SQLAlchemy 'by-design-behaviour'?
Just saw the latest comment notification and realized I missed some earlier comments. Thanks for the kind words about the blog post! A couple responses:
Did anything ever come of this? Is this still your approach to transaction management, or is there a better solution now?
In my current work I'm no longer using either Postgres or SQLAlchemy. The project I did this on is still running in prod and there haven't been issues caused by this approach that I know of. But I only ever did it on one project.
isn't it some sort of hacking SQLAlchemy 'by-design-behaviour'
Yup. I was curious to see if it could be done, and as I mentioned in the blog post, I'm not super happy with how hacky the solution feels, and wish there were a well-supported way to get this behavior in SQLAlchemy.
Thanks for the nice blog post and all the explanations.
But please make sure to not use this code as a dependency in a fastapi project.
Doing a commit in a dependency with yield is only executed after returning data to the client.
From https://fastapi.tiangolo.com/tutorial/dependencies/dependencies-with-yield/:
Only the code prior to and including the yield statement is executed before creating a response
For more discussions see e.g. fastapi/fastapi#3620
I've been wrestling the past couple weeks with the subtle nuances of Postgres and SQLAlchemy's transaction management. Thanks for the great blog post as well as this code. This helps me out a lot!