Last active
January 29, 2019 11:54
-
-
Save vxgmichel/8ac8729e0f0cf58ee49e0fcc6c76a7b4 to your computer and use it in GitHub Desktop.
Helper to wrap and monitor a trio task
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import attr | |
import trio | |
@attr.s | |
class TaskStatus: | |
# Internal state | |
_cancel_scope = attr.ib(default=None) | |
_started_value = attr.ib(default=None) | |
_finished_event = attr.ib(factory=trio.Event) | |
def _set_cancel_scope(self, scope): | |
self._cancel_scope = scope | |
def _set_started_value(self, value): | |
self._started_value = value | |
def _set_finished(self): | |
self._finished_event.set() | |
# Properties | |
@property | |
def cancel_called(self): | |
return self._cancel_scope.cancel_called | |
@property | |
def finished(self): | |
return self._finished_event.is_set() | |
@property | |
def value(self): | |
return self._started_value | |
# Methods | |
def cancel(self): | |
self._cancel_scope.cancel() | |
async def wait(self): | |
await self._finished_event.wait() | |
await trio.sleep(0) # Checkpoint | |
async def cancel_and_wait(self): | |
self.cancel() | |
await self.wait() | |
async def wrap_task(corofn, *args, task_status=trio.TASK_STATUS_IGNORED): | |
status = TaskStatus() | |
try: | |
async with trio.open_nursery() as nursery: | |
status._set_cancel_scope(nursery.cancel_scope) | |
value = await nursery.start(corofn, *args) | |
status._set_started_value(value) | |
task_status.started(status) | |
finally: | |
status._set_finished() | |
async def start_task(nursery, corofn, *args, name=None): | |
return await nursery.start(wrap_task, corofn, *args, name=name) | |
# Testing | |
import pytest | |
@pytest.mark.trio | |
async def test(autojump_clock): | |
async def job(fail=0, task_status=trio.TASK_STATUS_IGNORED): | |
await trio.sleep(1) | |
if fail < 0: | |
raise RuntimeError("Oops") | |
success = set() | |
task_status.started(success) | |
await trio.sleep(1) | |
if fail > 0: | |
raise RuntimeError("Oops") | |
success.add(1) | |
async with trio.open_nursery() as nursery: | |
status = await start_task(nursery, job) | |
success = status.value | |
assert success == set() | |
assert not status.finished | |
await status.wait() | |
assert status.finished | |
assert success | |
async with trio.open_nursery() as nursery: | |
status = await start_task(nursery, job) | |
success = status.value | |
assert success == set() | |
assert not status.finished | |
await status.cancel_and_wait() | |
assert status.cancel_called | |
assert status.finished | |
assert not status.value | |
async with trio.open_nursery() as nursery: | |
with pytest.raises(RuntimeError): | |
status = await start_task(nursery, job, -1) | |
with pytest.raises(RuntimeError): | |
async with trio.open_nursery() as nursery: | |
status = await start_task(nursery, job, +1) | |
success = status.value | |
assert success == set() | |
assert not status.finished | |
await status.wait() | |
assert False # pragma: no cover - gets cancelled |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment