Created
November 7, 2018 14:17
-
-
Save romuald/266f8901bfb0032d01f4e37639dd7587 to your computer and use it in GitHub Desktop.
Simple TaskPool implementation with asyncio
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class TaskPool: | |
| """ | |
| asyncio task pool, run no more than n tasks concurrently | |
| Example: | |
| >>> pool = TaskPool(5) | |
| >>> todo = [session.get(url) for url in urls] | |
| >>> tasks = pool.create_tasks(todo) | |
| Waiting for all results: | |
| >>> [pool.loop.run_until_complete(task) for task in tasks] | |
| No more than 5 GET will be executed a the same time | |
| """ | |
| def __init__(self, size, loop=None): | |
| self.semaphore = asyncio.Semaphore(size) | |
| self.loop = loop or asyncio.get_event_loop() | |
| def create_tasks(self, coroutines): | |
| """ | |
| Returns a list of scheduled tasks, wrapped in our semaphore lock | |
| """ | |
| return [self.loop.create_task(self._wrapped(coro)) | |
| for coro in coroutines] | |
| async def _wrapped(self, coro): | |
| """ | |
| Wraps coroutine execution inside our semaphore lock | |
| """ | |
| await self.semaphore.acquire() | |
| try: | |
| return await coro | |
| finally: | |
| self.semaphore.release() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
(Need to add an utility method to iterate over tasks as their result become available)