Skip to content

Instantly share code, notes, and snippets.

@benfasoli
Created January 23, 2022 17:18
Show Gist options
  • Save benfasoli/650a57923ab1951e1cb6355f033cbc8b to your computer and use it in GitHub Desktop.
Save benfasoli/650a57923ab1951e1cb6355f033cbc8b to your computer and use it in GitHub Desktop.
Limit concurrency with Python asyncio
import asyncio
from typing import Coroutine, List, Sequence
def _limit_concurrency(
coroutines: Sequence[Coroutine], concurrency: int
) -> List[Coroutine]:
"""Decorate coroutines to limit concurrency.
Enforces a limit on the number of coroutines that can run concurrently in higher
level asyncio-compatible concurrency managers like asyncio.gather(coroutines) and
asyncio.as_completed(coroutines).
"""
semaphore = asyncio.Semaphore(concurrency)
async def with_concurrency_limit(coroutine: Coroutine) -> Coroutine:
async with semaphore:
return await coroutine
return [with_concurrency_limit(coroutine) for coroutine in coroutines]
async def sleep_for_seconds(seconds: float) -> None:
print(f"Going to sleep for {seconds} seconds...")
await asyncio.sleep(seconds)
print(f"Woke up after {seconds} seconds!")
async def main():
coroutines = [sleep_for_seconds(1), sleep_for_seconds(2), sleep_for_seconds(3)]
await asyncio.gather(*_limit_concurrency(coroutines, concurrency=2))
if __name__ == "__main__":
asyncio.run(main())
# Going to sleep for 1 seconds...
# Going to sleep for 2 seconds...
# Woke up after 1 seconds!
# Going to sleep for 3 seconds...
# Woke up after 2 seconds!
# Woke up after 3 seconds!
@teodoryantcheff
Copy link

Thanks!

@etherealite
Copy link

etherealite commented Oct 5, 2024

This function does not cleanly handle loop stop conditions. If any coroutine has an unhandled exception that slips through async.gather() you will be left with a bunch of un-awaited coroutines and the errors that come with them.

@benfasoli
Copy link
Author

This function does not cleanly handle loop stop conditions. If any coroutine has an unhandled exception that slips through async.gather() you will be left with a bunch of un-awaited coroutines and the errors that come with them.

This doesn't assume any responsibility for error handing. If a coroutine/task/future raises an exception, that exception bubbles up to the caller in a way that's consistent with whatever asyncio method was used to run the awaitable.

In the case of asyncio.gather(...), there's no implicit cancellation of other awaitables if one raises an error. You'd need to use asyncio.as_completed(...), manage tasks/futures manually, etc. to achieve your desired behavior regardless of whether you choose to limit concurrency.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment