Created
August 21, 2019 15:54
-
-
Save mprymek/133244919964aa18f57fbda3b55df738 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import asyncio.futures | |
import logging | |
import aiojobs | |
logging.basicConfig(format='[%(levelname)-7s] %(message)s', level=logging.DEBUG) | |
logger = logging.getLogger(__name__) | |
class Nursery: | |
''' | |
Implements aionursery-like API over aiojobs' Scheduler. | |
There's one big and extremely stupid catch: the `exception_handler` is | |
*sync* function therefore we cannot call async `self._scheduler.close()` | |
from it directly. Hence the `_exception_event`. | |
''' | |
def __init__(self, name='nursery'): | |
self._name = name | |
self._exception_event = asyncio.Event() | |
self._scheduler = None | |
self._killer_task = None | |
def _exception_handler(self, scheduler, context): | |
logging.debug(f'{self._name}: exc handler fired') | |
self._exception_event.set() | |
# call default exception handler to logging.info out exception info | |
logging.error(80*'-') | |
scheduler._loop.call_exception_handler(context) | |
logging.error(80*'-') | |
async def __aenter__(self): | |
logging.debug(f'{self._name}: aenter') | |
self._scheduler = await aiojobs.create_scheduler(limit=None, exception_handler=self._exception_handler) | |
async def killer(scheduler, close_event): | |
logging.debug(f'{self._name}: killer started') | |
await close_event.wait() | |
logging.info(f'{self._name}: killing all jobs') | |
await self._scheduler.close() | |
self._killer_task = asyncio.create_task(killer(self._scheduler, self._exception_event)) | |
return self | |
async def __aexit__(self, exc_type, exc, tb): | |
logging.debug(f'{self._name}: aexit') | |
await self._scheduler.close() | |
if self._killer_task is not None: | |
self._killer_task.cancel() | |
async def wait(self): | |
# When we use the following code, `_exception_handler` is not called. | |
# -God- Guido knows why... | |
#wait_coros = [job.wait() for job in self._scheduler] | |
#await asyncio.gather(*wait_coros, return_exceptions=True) | |
# ...therefore I'm using this worse "semi-busy waiting" code. | |
while len(self._scheduler): | |
await asyncio.sleep(0.1) | |
def __repr__(self): | |
return f'<Nursery {self._name} jobs={len(self._scheduler)}>' | |
async def start_soon(self, func, *args, **kwargs): | |
await self._scheduler.spawn(func(*args, **kwargs)) | |
async def task1(name, delay=0.5, count_to=4, raise_at=None): | |
try: | |
logging.info(f'{name}: start') | |
for i in range(count_to): | |
logging.info(f'{name}: {i}') | |
if i == raise_at: | |
raise RuntimeError(f'error in task {name}') | |
await asyncio.sleep(delay) | |
logging.info(f'{name}: end') | |
except asyncio.CancelledError: | |
logging.info(f'{name}: cancelled') | |
async def check_tasks(): | |
#logging.info('\n** waiting for non-pending tasks cleanup...') | |
await asyncio.sleep(0.1) | |
pending = [task for task in asyncio.Task.all_tasks() if task._state == asyncio.futures._PENDING] | |
if len(pending) != 1: | |
logging.info('************ ERROR! Pending tasks!') | |
for task in pending: | |
logging.info(f'\t{task}') | |
async def test1(): | |
logging.info('*********** classic aiojobs scheduler') | |
scheduler1 = await aiojobs.create_scheduler() | |
for i in range(2): | |
await scheduler1.spawn(task1(f's1t{i}')) | |
logging.info('scheduler1: tasks spawned') | |
logging.info(scheduler1) | |
logging.info('waiting for jobs in scheduler1 to end') | |
wait_coros = [job.wait() for job in scheduler1] | |
await asyncio.gather(*wait_coros, return_exceptions=True) | |
await scheduler1.close() | |
logging.info('scheduler1: closed') | |
logging.info(scheduler1) | |
await check_tasks() | |
async def test2(): | |
logging.info('*********** nursery without waiting - tasks 0 and 2 should be killed') | |
async with Nursery('n1') as nursery: | |
for i in range(2): | |
await nursery.start_soon(task1, f'n1t{i}') | |
logging.info('nursery n1: tasks started') | |
logging.info(nursery) | |
logging.info('nursery n1 closed') | |
await check_tasks() | |
async def test3(): | |
logging.info('*********** nursery with waiting - tasks should end cleanly') | |
async with Nursery('n2') as nursery: | |
for i in range(2): | |
await nursery.start_soon(task1, f'n2t{i}') | |
logging.info('nursery n2: tasks started') | |
logging.info(nursery) | |
logging.info('waiting for jobs in nursery n2') | |
await nursery.wait() | |
logging.info('nursery n2 closed') | |
await check_tasks() | |
async def test4(): | |
logging.info('*********** nursery with exception raising - tasks 0 and 2 should be killed') | |
async with Nursery('n3') as nursery: | |
for i in range(3): | |
if i == 1: | |
await nursery.start_soon(task1, f'n3t{i}', raise_at=1) | |
else: | |
await nursery.start_soon(task1, f'n3t{i}') | |
logging.info('nursery n3: tasks started') | |
logging.info(nursery) | |
logging.info('waiting for jobs in nursery n3') | |
await nursery.wait() | |
logging.info('nursery n3 closed') | |
await check_tasks() | |
async def main(): | |
await test1() | |
await test2() | |
await test3() | |
await test4() | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment