Skip to content

Instantly share code, notes, and snippets.

@romuald
Created November 7, 2018 14:17
Show Gist options
  • Select an option

  • Save romuald/266f8901bfb0032d01f4e37639dd7587 to your computer and use it in GitHub Desktop.

Select an option

Save romuald/266f8901bfb0032d01f4e37639dd7587 to your computer and use it in GitHub Desktop.
Simple TaskPool implementation with asyncio
class TaskPool:
"""
asyncio task pool, run no more than n tasks concurrently
Example:
>>> pool = TaskPool(5)
>>> todo = [session.get(url) for url in urls]
>>> tasks = pool.create_tasks(todo)
Waiting for all results:
>>> [pool.loop.run_until_complete(task) for task in tasks]
No more than 5 GET will be executed a the same time
"""
def __init__(self, size, loop=None):
self.semaphore = asyncio.Semaphore(size)
self.loop = loop or asyncio.get_event_loop()
def create_tasks(self, coroutines):
"""
Returns a list of scheduled tasks, wrapped in our semaphore lock
"""
return [self.loop.create_task(self._wrapped(coro))
for coro in coroutines]
async def _wrapped(self, coro):
"""
Wraps coroutine execution inside our semaphore lock
"""
await self.semaphore.acquire()
try:
return await coro
finally:
self.semaphore.release()
@romuald
Copy link
Author

romuald commented Nov 7, 2018

(Need to add an utility method to iterate over tasks as their result become available)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment