Created
August 9, 2023 11:08
-
-
Save clbarnes/a5e17e20e321d16bb00d75315a1b9ab7 to your computer and use it in GitHub Desktop.
Semaphore-based asyncio rate limiters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio as aio | |
from collections import deque | |
from typing import Awaitable, Iterable, TypeVar | |
T = TypeVar("T") | |
class BaseLimit: | |
async def limit(self, awa: Awaitable[T]) -> T: | |
return await aio.ensure_future(awa) | |
class ConcurrencyLimit(BaseLimit): | |
def __init__(self, count: int) -> None: | |
self.semaphore = aio.Semaphore(count) | |
super().__init__() | |
async def limit(self, awa: Awaitable[T]) -> T: | |
await self.semaphore.acquire() | |
fut = aio.ensure_future(awa) | |
def release_callback(_fut): | |
self.semaphore.release() | |
fut.add_done_callback(release_callback) | |
return await fut | |
class RateLimit(BaseLimit): | |
def __init__(self, count: int = 1, seconds: float = 1) -> None: | |
self.semaphore = aio.Semaphore(int(count)) | |
self.seconds = seconds | |
self._wait_tasks = set() | |
def _schedule_semaphore_release(self): | |
def release_callback(task): | |
self.semaphore.release() | |
self._wait_tasks.discard(task) | |
wait = aio.create_task(aio.sleep(self.seconds)) | |
wait.add_done_callback(release_callback) | |
self._wait_tasks.add(wait) | |
async def limit(self, awa: Awaitable[T]) -> T: | |
await self.semaphore.acquire() | |
task = aio.ensure_future(awa) | |
self._schedule_semaphore_release() | |
return await task | |
def sliding_window(awas: Iterable[Awaitable[T]], concurrency: int, ordered=True) -> Iterable[Awaitable[T]]: | |
if not ordered: | |
yield from _sliding_window_unordered(awas, concurrency) | |
return | |
q = deque(maxlen=concurrency) | |
awa_it = iter(awas) | |
it_empty = False | |
for _, item in zip(range(concurrency), awa_it): | |
t = aio.ensure_future(item) | |
q.append(t) | |
if len(q) < concurrency: | |
it_empty = True | |
while q: | |
yield q.popleft() | |
if it_empty: | |
continue | |
try: | |
item = next(awa_it) | |
except StopIteration: | |
it_empty = True | |
else: | |
q.append(aio.ensure_future(item)) | |
def _sliding_window_unordered(awas: Iterable[Awaitable[T]], concurrency: int) -> Iterable[Awaitable[T]]: | |
done = aio.Queue() | |
awa_it = iter(awas) | |
it_empty = False | |
todo = set() | |
def _on_completion(f): | |
todo.remove(f) | |
done.put_nowait(f) | |
async def _wait_for_one(): | |
f = await done.get() | |
return f.result() | |
count = 0 | |
for _, i in zip(range(concurrency), awa_it): | |
t = aio.ensure_future(i) | |
t.add_done_callback(_on_completion) | |
todo.add(t) | |
count += 1 | |
if count < concurrency: | |
it_empty = True | |
while not it_empty: | |
yield _wait_for_one() | |
count -= 1 | |
try: | |
i = next(awa_it) | |
except StopIteration: | |
it_empty = True | |
else: | |
t = aio.ensure_future(i) | |
t.add_done_callback(_on_completion) | |
todo.add(t) | |
count += 1 | |
while count > 0: | |
yield _wait_for_one() | |
count -= 1 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment