Last active
November 22, 2018 03:08
-
-
Save mmerickel/43c136cd24d510ffdcd828ae04d87b72 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from alembic import context | |
from pyramid.paster import setup_logging | |
from sqlalchemy import create_engine, pool | |
# this is the Alembic Config object, which provides | |
# access to the values within the .ini file in use. | |
config = context.config | |
# Interpret the config file for Python logging. | |
# This line sets up loggers basically. | |
if hasattr(config, 'config_file_name'): | |
path = config.config_file_name | |
setup_logging(path) | |
# add your model's MetaData object here | |
# for 'autogenerate' support | |
# from myapp import mymodel | |
# target_metadata = mymodel.Base.metadata | |
import myapp.model.meta.base # noqa | |
target_metadata = myapp.model.meta.base.metadata | |
def run_migrations_online(): | |
"""Run migrations in 'online' mode. | |
In this scenario we need to create an Engine | |
and associate a connection with the context. | |
""" | |
url = config.get_section('db')['url'] | |
engine = create_engine(url, poolclass=pool.NullPool) | |
connection = engine.connect() | |
context.configure( | |
connection=connection, | |
target_metadata=target_metadata, | |
transaction_per_migration=True, | |
) | |
try: | |
with context.begin_transaction(): | |
context.run_migrations() | |
finally: | |
connection.close() | |
if context.is_offline_mode(): | |
raise NotImplementedError | |
else: | |
run_migrations_online() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import weakref | |
from alembic.config import Config | |
from alembic.migration import MigrationContext | |
from alembic.script import ScriptDirectory | |
import sqlalchemy as sa | |
from sqlalchemy.engine.url import make_url | |
from sqlalchemy.orm import configure_mappers | |
from .types import json_deserializer, json_serializer | |
log = __import__('logging').getLogger(__name__) | |
engine_cache = weakref.WeakValueDictionary() | |
def get_engine( | |
settings, | |
prefix='db.', | |
check_version=True, | |
pessimistic=True, | |
): | |
engine = None | |
prefix = prefix or '' | |
# pull out options because not all of them can be passed to | |
# the engine_from_config fn | |
opts = { | |
k[len(prefix):]: v | |
for k, v in settings.items() | |
if k.startswith(prefix) | |
} | |
opts.pop('here', None) | |
connect_timeout = opts.pop('connect_timeout', None) | |
lock_timeout = opts.pop('lock_timeout', None) | |
url = make_url(opts['url']) | |
query = url.query or {} | |
if connect_timeout: | |
query['connect_timeout'] = int(int(connect_timeout) / 1000) | |
if lock_timeout: | |
query['options'] = f'-c lock_timeout={lock_timeout}' | |
url.query = query | |
opts['url'] = url | |
# attempt to load the engine from the cache | |
engine_id = opts.pop('engine_id') | |
if engine_id: | |
engine = engine_cache.get(engine_id) | |
if engine: | |
log.debug('using cached engine for id=%s', engine_id) | |
# if no engine was found in the cache, load one | |
if not engine: | |
engine = sa.engine_from_config( | |
opts, | |
prefix='', | |
isolation_level='READ COMMITTED', | |
json_serializer=json_serializer, | |
json_deserializer=json_deserializer, | |
pool_pre_ping=pessimistic, | |
) | |
# assert the engine is at the correct migration | |
if check_version: | |
current_version = get_current_version(engine) | |
log.info('current migration: %s', current_version) | |
latest_version = get_latest_version(engine) | |
log.debug('latest migration: %s', latest_version) | |
if current_version != latest_version: | |
raise RuntimeError( | |
'database versions out of sync, %s != %s' % ( | |
current_version, latest_version)) | |
# for some reason the mappers are not properly configured at this stage | |
# and some model objects were missing relationships such as | |
# AppInstall.project_site_links | |
configure_mappers() | |
# store engine in cache | |
if engine_id: | |
engine_cache[engine_id] = engine | |
return engine | |
def get_current_version(engine): | |
conn = engine.connect() | |
try: | |
ctx = MigrationContext.configure(conn) | |
return ctx.get_current_revision() | |
finally: | |
conn.close() | |
def get_latest_version(engine): | |
cfg = Config() | |
cfg.set_main_option('script_location', 'myapp.model:migrations') | |
script = ScriptDirectory.from_config(cfg) | |
return script.get_current_head() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyramid.interfaces import IRequest | |
from pyramid_services import NewServiceContainer | |
import transaction | |
from wired import ServiceRegistry | |
from myapp.model.meta.engine import get_engine | |
from myapp.model.meta.session import get_session_factory | |
from myapp.model.meta.session import get_tm_session | |
from myapp.utils.settings import asbool | |
from .login import LoginService | |
log = __import__('logging').getLogger(__name__) | |
def make_service_factory(settings, flags=None): | |
if flags is None: | |
flags = {} | |
def flag(name, default): | |
return asbool(flags.get(name, default)) | |
services = ServiceRegistry() | |
services.register_singleton(settings, name='settings') | |
engine = get_engine(settings, pessimistic=flag('pessimistic_engine', True)) | |
services.register_singleton(engine, name='dbengine') | |
dbmaker = get_session_factory(engine) | |
services.register_singleton(dbmaker, name='dbmaker') | |
def tm_factory(services): | |
return transaction.TransactionManager(explicit=True) | |
services.register_factory(tm_factory, name='tm') | |
def db_factory(services): | |
tm = services.get(name='tm') | |
return get_tm_session(dbmaker, transaction_manager=tm) | |
services.register_factory(db_factory, name='db') | |
mq_source = load_source_from_settings(settings) | |
def mq_factory(services): | |
db = services.get(name='db') | |
tm = services.get(name='tm') | |
mq = mq_source.bind(db=db, transaction_manager=tm) | |
return mq | |
services.register_factory(mq_factory, name='mq') | |
def login_factory(services): | |
db = services.get(name='db') | |
svc = LoginService(db, settings) | |
return svc | |
services.register_factory(login_factory, LoginService) | |
return services | |
def includeme(config): | |
settings = config.get_settings() | |
settings.setdefault('tm.manager_hook', 'pyramid_tm.explicit_manager') | |
config.include('pyramid_tm') | |
config.include('pyramid_retry') | |
config.include('pyramid_services') | |
services = make_service_factory(settings, flags={ | |
# on web requests we rely on pyramid_retry to handle connection | |
# errors with the database but in background scripts we want | |
# to ping first and get a good connection from the pool because | |
# there is no retry | |
'pessimistic_engine': False, | |
}) | |
config.set_service_registry(services) | |
def on_new_container(event): | |
services = event.container | |
request = event.request | |
# override the default tm in the container with request.tm | |
# when servicing requests | |
services.set(request.tm, name='tm') | |
config.add_subscriber(on_new_container, NewServiceContainer) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zope.sqlalchemy | |
from sqlalchemy.orm import sessionmaker | |
log = __import__('logging').getLogger(__name__) | |
def get_session_factory(engine): | |
maker = sessionmaker() | |
maker.configure(bind=engine) | |
return maker | |
def get_tm_session(session_factory, transaction_manager): | |
session = session_factory() | |
zope.sqlalchemy.register(session, transaction_manager=transaction_manager) | |
return session |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pyramid.view import view_config | |
from myapp import services as S | |
@view_config( | |
route_name='login', | |
) | |
def login_view(request): | |
svc = request.find_service(S.LoginService) | |
... |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment