Skip to content

Instantly share code, notes, and snippets.

@vxgmichel
Last active January 29, 2019 11:54
Show Gist options
  • Save vxgmichel/8ac8729e0f0cf58ee49e0fcc6c76a7b4 to your computer and use it in GitHub Desktop.
Save vxgmichel/8ac8729e0f0cf58ee49e0fcc6c76a7b4 to your computer and use it in GitHub Desktop.
Helper to wrap and monitor a trio task
import attr
import trio
@attr.s
class TaskStatus:
# Internal state
_cancel_scope = attr.ib(default=None)
_started_value = attr.ib(default=None)
_finished_event = attr.ib(factory=trio.Event)
def _set_cancel_scope(self, scope):
self._cancel_scope = scope
def _set_started_value(self, value):
self._started_value = value
def _set_finished(self):
self._finished_event.set()
# Properties
@property
def cancel_called(self):
return self._cancel_scope.cancel_called
@property
def finished(self):
return self._finished_event.is_set()
@property
def value(self):
return self._started_value
# Methods
def cancel(self):
self._cancel_scope.cancel()
async def wait(self):
await self._finished_event.wait()
await trio.sleep(0) # Checkpoint
async def cancel_and_wait(self):
self.cancel()
await self.wait()
async def wrap_task(corofn, *args, task_status=trio.TASK_STATUS_IGNORED):
status = TaskStatus()
try:
async with trio.open_nursery() as nursery:
status._set_cancel_scope(nursery.cancel_scope)
value = await nursery.start(corofn, *args)
status._set_started_value(value)
task_status.started(status)
finally:
status._set_finished()
async def start_task(nursery, corofn, *args, name=None):
return await nursery.start(wrap_task, corofn, *args, name=name)
# Testing
import pytest
@pytest.mark.trio
async def test(autojump_clock):
async def job(fail=0, task_status=trio.TASK_STATUS_IGNORED):
await trio.sleep(1)
if fail < 0:
raise RuntimeError("Oops")
success = set()
task_status.started(success)
await trio.sleep(1)
if fail > 0:
raise RuntimeError("Oops")
success.add(1)
async with trio.open_nursery() as nursery:
status = await start_task(nursery, job)
success = status.value
assert success == set()
assert not status.finished
await status.wait()
assert status.finished
assert success
async with trio.open_nursery() as nursery:
status = await start_task(nursery, job)
success = status.value
assert success == set()
assert not status.finished
await status.cancel_and_wait()
assert status.cancel_called
assert status.finished
assert not status.value
async with trio.open_nursery() as nursery:
with pytest.raises(RuntimeError):
status = await start_task(nursery, job, -1)
with pytest.raises(RuntimeError):
async with trio.open_nursery() as nursery:
status = await start_task(nursery, job, +1)
success = status.value
assert success == set()
assert not status.finished
await status.wait()
assert False # pragma: no cover - gets cancelled
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment