Last active
May 17, 2017 22:41
-
-
Save dfee/e1dc510c24cedf7f79fb9e34024cda5f to your computer and use it in GitHub Desktop.
Guarding functions in asphalt that should only run in an executor.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
from asphalt.core import Context | |
import functools | |
def guard_executor(func): | |
no_ctx_msg = ('Context must be the first arg, or an instance variable of ' | |
'`self` (if wrapping an instance method).') | |
not_in_executor_msg = '{} must be run in an executor'.format(func.__name__) | |
@functools.wraps(func) | |
def wrapper(*args, **kwargs): | |
try: | |
ctx = next(arg for arg in args[:2] if isinstance(arg, Context)) | |
except StopIteration: | |
if len(args): | |
ctx = getattr(args[0], 'ctx', None) | |
if not isinstance(ctx, Context): | |
raise RuntimeError(no_ctx_msg) | |
else: | |
raise RuntimeError(no_ctx_msg) | |
# Assert in executor | |
try: | |
loop = asyncio.get_event_loop() | |
except RuntimeError: | |
loop = None | |
if loop == ctx.loop: | |
raise RuntimeError(not_in_executor_msg) | |
return func(*args, **kwargs) | |
return wrapper |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from asphalt.core import executor | |
import pytest | |
from spark.core.utils import guard_executor | |
class GuardKls: | |
def __init__(self, **kwargs): | |
for key, val in kwargs.items(): | |
setattr(self, key, val) | |
self.called = False | |
@guard_executor | |
def run(self, *args): | |
self.called = True | |
@guard_executor | |
def guard_fn(*args, **kwargs): | |
return True | |
class TestGuardExecutor: | |
@pytest.mark.asyncio | |
async def test_class_with_ctx(self, ctx): | |
guarded = GuardKls(ctx=ctx) | |
await executor(guarded.run)() | |
assert guarded.called | |
@pytest.mark.asyncio | |
async def test_class_without_ctx(self, ctx): | |
guarded = GuardKls() | |
await executor(guarded.run)(ctx) | |
assert guarded.called | |
@pytest.mark.asyncio | |
async def test_class_without_ctx_raises(self, ctx): | |
guarded = GuardKls() | |
with pytest.raises(RuntimeError) as excinfo: | |
await executor(guarded.run)() | |
assert excinfo.value.args[0] == ( | |
'Context must be the first arg, or an instance variable of ' | |
'`self` (if wrapping an instance method).' | |
) | |
assert not guarded.called | |
@pytest.mark.asyncio | |
async def test_function(self, ctx): | |
result = await executor(guard_fn)(ctx) | |
assert result | |
@pytest.mark.asyncio | |
async def test_function_without_ctx_raises(self): | |
with pytest.raises(RuntimeError) as excinfo: | |
await executor(guard_fn)() | |
assert excinfo.value.args[0] == ( | |
'Context must be the first arg, or an instance variable of ' | |
'`self` (if wrapping an instance method).' | |
) | |
def test_in_loop_thread_raises(self, ctx): | |
with pytest.raises(RuntimeError) as excinfo: | |
result = guard_fn(ctx) | |
assert excinfo.value.args[0] == 'guard_fn must be run in an executor' |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment