Last active
August 29, 2015 14:16
-
-
Save blaflamme/dae53d354ca81c770887 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| from pyramid.interfaces import IRequest | |
| from pyramid.settings import asbool | |
| try: | |
| import pyramid_services | |
| except: | |
| pyramid_services = None | |
| from sqlalchemy import engine_from_config | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker | |
| from zope.interface import Interface | |
| from zope.sqlalchemy import register | |
| from .interfaces import ( | |
| ISQLAEngine, | |
| ISQLASessionMaker | |
| ) | |
| Base = declarative_base() | |
| class ISQLAEngine(Interface): | |
| pass | |
| class ISQLASessionMaker(Interface): | |
| pass | |
| def get_engine(config_or_request): | |
| if config_or_request.registry.queryUtility(ISQLAEngine): | |
| return config_or_request.registry.getUtility(ISQLAEngine) | |
| else: | |
| raise Exception( | |
| 'No SQLAlchemy engine registered to ISQLAEngine interface.' | |
| ) | |
| def get_session_maker(config_or_request): | |
| if config_or_request.registry.queryUtility(ISQLASessionMaker): | |
| return config_or_request.registry.getUtility(ISQLASessionMaker) | |
| else: | |
| raise Exception( | |
| 'No SQLAlchemy sessionmaker registered to ISQLASessionMaker interface.' | |
| ) | |
| def get_session(config_or_request): | |
| maker = get_session_maker(config_or_request) | |
| session = maker() | |
| if IRequest.providedBy(config_or_request): | |
| register(session, transaction_manager=config_or_request.tm) | |
| return session | |
| def register_sqla(registry): | |
| settings = registry.settings | |
| engine = engine_from_config(settings, 'sqlalchemy.') | |
| maker = sessionmaker() | |
| maker.configure(bind=engine) | |
| registry.registerUtility(engine, ISQLAEngine) | |
| registry.registerUtility(maker, ISQLASessionMaker) | |
| def includeme(config): | |
| settings = config.get_settings() | |
| config.include('pyramid_tm') | |
| register_sqla(config.registry) | |
| if pyramid_services and asbool(settings.get('sqla.session_service')): | |
| config.include(pyramid_services) | |
| def _session_service(context, request): | |
| return get_session(request) | |
| config.register_service_factory(_session_service, name='db') | |
| else: | |
| config.add_request_method(get_session, 'db', reify=True) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| import os | |
| import sys | |
| import transaction | |
| from pyramid.paster import ( | |
| bootstrap, | |
| setup_logging | |
| ) | |
| from pyramid.scripts.common import parse_vars | |
| from pyramid_sqla import ( | |
| get_engine, | |
| get_session_maker, | |
| register_sqla, | |
| Base | |
| ) | |
| from ..models import MyModel | |
| def usage(argv): | |
| cmd = os.path.basename(argv[0]) | |
| print( | |
| 'usage: %s <config_uri> [var=value]\n' | |
| '(example: "%s development.ini")' % (cmd, cmd) | |
| ) | |
| sys.exit(1) | |
| def main(argv=sys.argv): | |
| if len(argv) < 2: | |
| usage(argv) | |
| config_uri = argv[1] | |
| setup_logging(config_uri) | |
| env = bootstrap(config_uri) | |
| request = env['request'] | |
| registry = env['registry'] | |
| register_sqla(registry) | |
| engine = get_engine(request) | |
| db = get_session_maker(request)() | |
| Base.metadata.create_all(engine) | |
| with transaction.manager: | |
| model = MyModel(name='one', value=1) | |
| db.add(model) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| import unittest | |
| import transaction | |
| from pyramid import testing | |
| from pyramid_sqla import ( | |
| get_engine, | |
| get_session_maker, | |
| Base | |
| ) | |
| from .models import MyModel | |
| class TestMyViewSuccessCondition(unittest.TestCase): | |
| def setUp(self): | |
| settings = {'sqlalchemy.url': 'sqlite:///:memory:'} | |
| self.config = testing.setUp(settings=settings) | |
| self.config.include('pyramid_sqla') | |
| engine = get_engine(self.config) | |
| Base.metadata.create_all(engine) | |
| self.db = get_session_maker(self.config)() | |
| with transaction.manager: | |
| model = MyModel(name='one', value=55) | |
| self.db.add(model) | |
| def tearDown(self): | |
| self.db.close() | |
| testing.tearDown() | |
| def test_passing_view(self): | |
| from .views import my_view | |
| request = testing.DummyRequest(db=self.db) | |
| info = my_view(request) | |
| self.assertEqual(info['one'].name, 'one') | |
| self.assertEqual(info['project'], 'alchemy') |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| from pyramid.response import Response | |
| from pyramid.settings import asbool | |
| from pyramid.view import view_config | |
| from sqlalchemy.exc import DBAPIError | |
| from .models import MyModel | |
| @view_config( | |
| route_name='home', | |
| renderer='templates/mytemplate.pt' | |
| ) | |
| def my_view(request): | |
| settings = request.registry.settings | |
| # Normally it would be as a request prop or as a service | |
| if asbool(settings.get('sqla.session_service')): | |
| db = request.find_service(name='db') | |
| else: | |
| db = request.db | |
| try: | |
| one = db.query(MyModel).filter(MyModel.name == 'one').first() | |
| except DBAPIError: | |
| return Response('Connection Error!', content_type='text/plain', status_int=500) | |
| return {'one': one, 'project': 'alchemy'} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment