Created
March 23, 2021 15:33
-
-
Save MihanixA/d31f6790f4f0692e49968c98870cbe78 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import uuid | |
from typing import Iterator | |
from collections import deque | |
from contextlib import contextmanager | |
import threading | |
import traceback | |
import sqlalchemy | |
import sqlalchemy.event | |
import sqlalchemy.orm | |
from sqlalchemy.orm import Session | |
# importing logger class | |
from common.logger import Logger | |
__all__ = ( | |
'DatabaseIsolationLevel', | |
'DatabaseMixin', | |
'uuid_simple' | |
) | |
class DatabaseIsolationLevel: | |
read_committed = 'READ_COMMITTED' | |
repeatable_read = 'REPEATABLE_READ' | |
serializable = 'SERIALIZABLE' | |
default = read_committed | |
def uuid_simple(): | |
return uuid.uuid4().hex[:8] | |
class DatabaseMixin: | |
__session_class__ = Session | |
__db_thread_local = threading.local() | |
__db_logger = Logger('db') | |
def __init_subclass__(cls, db_url=None, db_base=None, **kwargs): | |
super().__init_subclass__(**kwargs) | |
cls._db_url = db_url | |
cls._db_base = db_base | |
def engine(self, isolation_level=None) -> sqlalchemy.engine.Engine: | |
isolation_level = isolation_level or DatabaseIsolationLevel.default | |
key = (self._db_url, isolation_level) | |
if not hasattr(self.__db_thread_local, 'engines'): | |
self.__db_thread_local.engines = {} | |
engine = self.__db_thread_local.engines.get(key, None) | |
if engine is None: | |
engine = sqlalchemy.create_engine(self._db_url, isolation_level=isolation_level, | |
pool_recycle=500, pool_size=30, pool_pre_ping=True) | |
self._db_base.metadata.create_all(engine) | |
def cut_long(statement, length: int = 3000): | |
statement = str(statement).replace('\n', ' ') | |
while ' ' in statement: | |
statement = statement.replace(' ', ' ') | |
return statement[:length] | |
@sqlalchemy.event.listens_for(engine, 'before_cursor_execute') | |
def before_cursor_execute(conn, _, statement, params, ____, _____): | |
query_id = uuid_simple() | |
conn.info.setdefault('query_start_time', deque()).appendleft(time.time()) | |
conn.info.setdefault('query_id', deque()).appendleft(query_id) | |
self.__db_logger.debug(f'Query started', query_id=query_id, statement=cut_long(statement), | |
parameters=cut_long(params)) | |
@sqlalchemy.event.listens_for(engine, 'after_cursor_execute') | |
def after_cursor_execute(conn, _, __, ___, ____, _____): | |
query_id = conn.info['query_id'].pop() | |
elapsed = time.time() - conn.info['query_start_time'].pop() | |
elapsed_ms = int(elapsed * 1000) | |
self.__db_logger.debug(f'Query completed', query_id=query_id, elapsed_ms=elapsed_ms) | |
self.__db_logger.info(f'Connected to database', db_url=repr(engine), isolation_level=isolation_level) | |
self.__db_thread_local.engines[key] = engine | |
return engine | |
def sessionmaker(self, isolation_level=None) -> sqlalchemy.orm.sessionmaker: | |
isolation_level = isolation_level or DatabaseIsolationLevel.default | |
key = (self._db_url, isolation_level) | |
if not hasattr(self.__db_thread_local, 'sessionmakers'): | |
self.__db_thread_local.sessionmakers = {} | |
sessionmaker = self.__db_thread_local.sessionmakers.get(key, None) | |
if sessionmaker is None: | |
sessionmaker = sqlalchemy.orm.sessionmaker(bind=self.engine(isolation_level=isolation_level), | |
class_=self.__session_class__) | |
self.__db_thread_local.sessionmakers[key] = sessionmaker | |
return sessionmaker | |
@contextmanager | |
def connect(self, autoflush=False, autocommit=False, expire_on_commit=False, | |
isolation_level=None) -> Iterator[Session]: | |
""" | |
Example usage: | |
db = DatabaseETL() | |
with db.connect() as session: | |
do_something(session) | |
:param autoflush: When ``True``, all query operations will issue a | |
:meth:`~.Session.flush` call to this ``Session`` before proceeding. | |
This is a convenience feature so that :meth:`~.Session.flush` need | |
not be called repeatedly in order for database queries to retrieve | |
results. It's typical that ``autoflush`` is used in conjunction | |
with ``autocommit=False``. In this scenario, explicit calls to | |
:meth:`~.Session.flush` are rarely needed; you usually only need to | |
call :meth:`~.Session.commit` (which flushes) to finalize changes. | |
:param autocommit: | |
.. warning:: | |
The autocommit flag is **not for general use**, and if it is | |
used, queries should only be invoked within the span of a | |
:meth:`.Session.begin` / :meth:`.Session.commit` pair. Executing | |
queries outside of a demarcated transaction is a legacy mode | |
of usage, and can in some cases lead to concurrent connection | |
checkouts. | |
Defaults to ``False``. When ``True``, the | |
:class:`.Session` does not keep a persistent transaction running, | |
and will acquire connections from the engine on an as-needed basis, | |
returning them immediately after their use. Flushes will begin and | |
commit (or possibly rollback) their own transaction if no | |
transaction is present. When using this mode, the | |
:meth:`.Session.begin` method is used to explicitly start | |
transactions. | |
:param expire_on_commit: Defaults to ``False``. When ``True``, all | |
instances will be fully expired after each :meth:`~.commit`, | |
so that all attribute/object access subsequent to a completed | |
transaction will load from the most recent database state. | |
:param isolation_level: this string parameter is interpreted by various | |
dialects in order to affect the transaction isolation level of the | |
database connection. The parameter essentially accepts some subset of | |
these string arguments: ``"SERIALIZABLE"``, ``"REPEATABLE_READ"``, | |
``"READ_COMMITTED"``, ``"READ_UNCOMMITTED"`` and ``"AUTOCOMMIT"``. | |
Behavior here varies per backend, and | |
individual dialects should be consulted directly. | |
""" | |
transaction_id = uuid_simple() | |
session: Session = self.sessionmaker(isolation_level)(autoflush=autoflush, autocommit=autocommit, | |
expire_on_commit=expire_on_commit) | |
self.__db_logger.debug('Transaction started', transaction_id=transaction_id) | |
try: | |
yield session | |
session.commit() | |
except BaseException as e: | |
self.__db_logger.warning(f'Rolling back db transaction {transaction_id}', exception=e, | |
traceback=traceback.format_exc()) | |
session.rollback() | |
raise | |
finally: | |
session.close() | |
self.__db_logger.debug('Transaction finished', transaction_id=transaction_id) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment