Created
June 4, 2015 21:22
-
-
Save miohtama/278fd4eeb9e5272d061c to your computer and use it in GitHub Desktop.
Perform database sync sanity check to SQLAlchemy models on application startup
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
from sqlalchemy import inspect | |
from sqlalchemy.ext.declarative.clsregistry import _ModuleMarker | |
from sqlalchemy.orm import RelationshipProperty | |
logger = logging.getLogger(__name__) | |
def is_sane_database(Base, session): | |
"""Check whether the current database matches the models declared in model base. | |
Currently we check that all tables exist with all columns. What is not checked | |
* Column types are not verified | |
* Relationships are not verified at all (TODO) | |
:param Base: Declarative Base for SQLAlchemy models to check | |
:param session: SQLAlchemy session bound to an engine | |
:return: True if all declared models have corresponding tables and columns. | |
""" | |
engine = session.get_bind() | |
iengine = inspect(engine) | |
errors = False | |
tables = iengine.get_table_names() | |
# Go through all SQLAlchemy models | |
for name, klass in Base._decl_class_registry.items(): | |
if isinstance(klass, _ModuleMarker): | |
# Not a model | |
continue | |
table = klass.__tablename__ | |
if table in tables: | |
# Check all columns are found | |
# Looks like [{'default': "nextval('sanity_check_test_id_seq'::regclass)", 'autoincrement': True, 'nullable': False, 'type': INTEGER(), 'name': 'id'}] | |
columns = [c["name"] for c in iengine.get_columns(table)] | |
mapper = inspect(klass) | |
for column_prop in mapper.attrs: | |
if isinstance(column_prop, RelationshipProperty): | |
# TODO: Add sanity checks for relations | |
pass | |
else: | |
for column in column_prop.columns: | |
# Assume normal flat column | |
if not column.key in columns: | |
logger.error("Model %s declares column %s which does not exist in database %s", klass, column.key, engine) | |
errors = True | |
else: | |
logger.error("Model %s declares table %s which does not exist in database %s", klass, table, engine) | |
errors = True | |
return not errors |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Tests for checking database sanity checks functions correctly.""" | |
from pyramid_web20.system.model.sanitycheck import is_sane_database | |
from sqlalchemy import engine_from_config, Column, Integer, String | |
import sqlalchemy | |
from sqlalchemy.ext.declarative import declarative_base, declared_attr | |
from sqlalchemy.ext.hybrid import hybrid_property | |
from sqlalchemy.orm import sessionmaker, relationship | |
from sqlalchemy import ForeignKey | |
def setup_module(self): | |
# Quiet log output for the tests | |
import logging | |
from pyramid_web20.system.model.sanitycheck import logger | |
#logger.setLevel(logging.FATAL) | |
def gen_test_model(): | |
Base = declarative_base() | |
class SaneTestModel(Base): | |
"""A sample SQLAlchemy model to demostrate db conflicts. """ | |
__tablename__ = "sanity_check_test" | |
#: Running counter used in foreign key references | |
id = Column(Integer, primary_key=True) | |
return Base, SaneTestModel | |
def gen_relation_models(): | |
Base = declarative_base() | |
class RelationTestModel(Base): | |
__tablename__ = "sanity_check_test_2" | |
id = Column(Integer, primary_key=True) | |
class RelationTestModel2(Base): | |
__tablename__ = "sanity_check_test_3" | |
id = Column(Integer, primary_key=True) | |
test_relationship_id = Column(ForeignKey("sanity_check_test_2.id")) | |
test_relationship = relationship(RelationTestModel, primaryjoin=test_relationship_id == RelationTestModel.id) | |
return Base, RelationTestModel, RelationTestModel2 | |
def gen_declarative(): | |
Base = declarative_base() | |
class DeclarativeTestModel(Base): | |
__tablename__ = "sanity_check_test_4" | |
id = Column(Integer, primary_key=True) | |
@declared_attr | |
def _password(self): | |
return Column('password', String(256), nullable=False) | |
@hybrid_property | |
def password(self): | |
return self._password | |
return Base, DeclarativeTestModel | |
def test_sanity_pass(ini_settings, dbsession): | |
"""See database sanity check completes when tables and columns are created.""" | |
engine = engine_from_config(ini_settings, 'sqlalchemy.') | |
conn = engine.connect() | |
trans = conn.begin() | |
Base, SaneTestModel = gen_test_model() | |
Session = sessionmaker(bind=engine) | |
session = Session() | |
try: | |
Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__]) | |
except sqlalchemy.exc.NoSuchTableError: | |
pass | |
Base.metadata.create_all(engine, tables=[SaneTestModel.__table__]) | |
try: | |
assert is_sane_database(Base, session) is True | |
finally: | |
Base.metadata.drop_all(engine) | |
def test_sanity_table_missing(ini_settings, dbsession): | |
"""See check fails when there is a missing table""" | |
engine = engine_from_config(ini_settings, 'sqlalchemy.') | |
conn = engine.connect() | |
trans = conn.begin() | |
Base, SaneTestModel = gen_test_model() | |
Session = sessionmaker(bind=engine) | |
session = Session() | |
try: | |
Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__]) | |
except sqlalchemy.exc.NoSuchTableError: | |
pass | |
assert is_sane_database(Base, session) is False | |
def test_sanity_column_missing(ini_settings, dbsession): | |
"""See check fails when there is a missing table""" | |
engine = engine_from_config(ini_settings, 'sqlalchemy.') | |
conn = engine.connect() | |
trans = conn.begin() | |
Session = sessionmaker(bind=engine) | |
session = Session() | |
Base, SaneTestModel = gen_test_model() | |
try: | |
Base.metadata.drop_all(engine, tables=[SaneTestModel.__table__]) | |
except sqlalchemy.exc.NoSuchTableError: | |
pass | |
Base.metadata.create_all(engine, tables=[SaneTestModel.__table__]) | |
# Delete one of the columns | |
engine.execute("ALTER TABLE sanity_check_test DROP COLUMN id") | |
assert is_sane_database(Base, session) is False | |
def test_sanity_pass_relationship(ini_settings, dbsession): | |
"""See database sanity check understands about relationships and don't deem them as missing column.""" | |
engine = engine_from_config(ini_settings, 'sqlalchemy.') | |
conn = engine.connect() | |
trans = conn.begin() | |
Session = sessionmaker(bind=engine) | |
session = Session() | |
Base, RelationTestModel, RelationTestModel2 = gen_relation_models() | |
try: | |
Base.metadata.drop_all(engine, tables=[RelationTestModel.__table__, RelationTestModel2.__table__]) | |
except sqlalchemy.exc.NoSuchTableError: | |
pass | |
Base.metadata.create_all(engine, tables=[RelationTestModel.__table__, RelationTestModel2.__table__]) | |
try: | |
assert is_sane_database(Base, session) is True | |
finally: | |
Base.metadata.drop_all(engine) | |
def test_sanity_pass_declarative(ini_settings, dbsession): | |
"""See database sanity check understands about relationships and don't deem them as missing column.""" | |
engine = engine_from_config(ini_settings, 'sqlalchemy.') | |
conn = engine.connect() | |
trans = conn.begin() | |
Session = sessionmaker(bind=engine) | |
session = Session() | |
Base, DeclarativeTestModel = gen_declarative() | |
try: | |
Base.metadata.drop_all(engine, tables=[DeclarativeTestModel.__table__]) | |
except sqlalchemy.exc.NoSuchTableError: | |
pass | |
Base.metadata.create_all(engine, tables=[DeclarativeTestModel.__table__]) | |
try: | |
assert is_sane_database(Base, session) is True | |
finally: | |
Base.metadata.drop_all(engine) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thank you. Do you mind if I include it in one of my PyPi packages (obviously crediting you and including a link to this gist)?