This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@pytest.fixture() | |
def memjobstore(): | |
return MemoryJobStore() | |
@pytest.fixture() | |
def shelvejobstore(request): | |
def finish(): | |
jobstore.close() | |
if os.path.exists(jobstore.path): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from functools import wraps | |
import inspect | |
def typechecked(func): | |
argspec = inspect.getfullargspec(func) | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
for i, (arg, argname) in enumerate(zip(args, argspec.args)): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SchedulerImplementationTestBase(object): | |
@pytest.fixture | |
def scheduler(self, request): | |
sched = self.create_scheduler() | |
request.addfinalizer(lambda: self.finish(sched)) | |
return sched | |
def create_scheduler(self): | |
raise NotImplementedError |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
@pytest.fixture | |
def scheduler(request): | |
sched = DummyScheduler() | |
if 'start_scheduler' in request.keywords: | |
sched.start() | |
request.addfinalizer(lambda: sched.shutdown() if sched.running else None) | |
return sched | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class A: | |
@classmethod | |
def __enter__(cls): pass | |
@classmethod | |
def __exit__(cls, type, value, traceback): pass | |
with A: | |
pass |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class BaseRequestHandler(RequestHandler): | |
__dbsession = None | |
@property | |
def dbsession(self): | |
if not self.__dbsession: | |
self.__dbsession = Session(self.application.settings['engine'], expire_on_commit=False) | |
return self.__dbsession | |
def on_finish(self): |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import zmq | |
c = zmq.Context.instance() | |
s = c.socket(zmq.SUB) | |
s.subscribe = b'' | |
s.bind('inproc://rpc-cmd') | |
s.unbind('inproc://rpc-cmd') | |
# Traceback (most recent call last): | |
# File "sockettest.py", line 6, in <module> | |
# s.unbind('inproc://rpc-cmd') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Author(db.Model): | |
id = db.Column(db.Integer, primary_key=True) | |
forenames = db.Column(db.String(120), index=True) | |
lastname = db.Column(db.String(120), index=True) | |
associations = db.relationship('ArticleAuthor', backref='author') | |
articles = association_proxy('associations', 'article') | |
#intermediate table used for ordering authors within articles |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from sqlalchemy.engine import create_engine | |
from sqlalchemy.orm.scoping import scoped_session | |
from sqlalchemy.orm.session import sessionmaker | |
class SQLAlchemy: | |
def __init__(self, app=None): | |
if app is not None: | |
self.init_app(app) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class IntegerEnumWrapper(TypeDecorator): | |
impl = Integer | |
def __init__(self, cls): | |
TypeDecorator.__init__(self) | |
self.wrapped = cls | |
def process_bind_param(self, value, dialect): | |
if value is None: | |
return None |