Skip to content

Instantly share code, notes, and snippets.

@x42005e1f
Last active November 10, 2025 01:36
Show Gist options
  • Save x42005e1f/976f84ac395ae2b51f14c8507be0ea34 to your computer and use it in GitHub Desktop.
Save x42005e1f/976f84ac395ae2b51f14c8507be0ea34 to your computer and use it in GitHub Desktop.
A rate limiter (async-aware only)
#!/usr/bin/env python3
# SPDX-FileCopyrightText: 2022 Ilya Egorov <[email protected]>
# SPDX-License-Identifier: 0BSD
__all__ = (
"RPSLock",
)
from functools import wraps
from itertools import repeat
from collections import deque
from anyio import Lock, Semaphore, WouldBlock, current_time, sleep_until
class RPSLock:
__slots__ = (
"__weakref__",
"__acquire_lock", "__lock",
"enter_timestamps", "exit_timestamps",
"rate", "width",
)
def __new__(cls, /, rate=1, width=1):
self = super().__new__(cls)
self.__acquire_lock = None
self.__lock = None
self.enter_timestamps = deque(repeat(0, rate), rate)
self.exit_timestamps = deque(repeat(0, rate), rate)
self.rate = rate
self.width = width
return self
def __getnewargs__(self, /):
return (self.rate, self.width)
def __getstate__(self, /):
return None
def __repr__(self, /):
return f"RPSLock(rate={self.rate}, width={self.width})"
async def __aenter__(self, /):
await self.acquire()
return self
async def __aexit__(self, /, exc_type, exc_value, traceback):
self.release()
def __call__(self, func, /):
@wraps(func)
async def wrapper(*args, **kwargs):
async with self:
return await func(*args, **kwargs)
return wrapper
async def acquire(self, /):
async with self.acquire_lock:
lock = self.lock
await lock.acquire()
try:
rate = self.rate
width = self.width
if timestamp := self.enter_timestamps[-1]:
enter_deadline = timestamp + width / rate
else:
enter_deadline = 0
if timestamp := self.exit_timestamps[rate - lock.value - 1]:
exit_deadline = timestamp + width
else:
exit_deadline = 0
await sleep_until(max(enter_deadline, exit_deadline))
self.enter_timestamps.append(current_time())
except BaseException:
lock.release()
raise
def acquire_nowait(self, /):
if self.locked():
raise WouldBlock
lock = self.lock
lock.acquire_nowait()
try:
self.enter_timestamps.append(current_time())
except BaseException:
lock.release()
raise
def release(self, /):
self.exit_timestamps.append(current_time())
self.lock.release()
def locked(self, /):
acquire_lock = self.__acquire_lock
if acquire_lock is None:
return False
if acquire_lock.locked():
return True
lock = self.__lock
if lock is None:
return False
if not lock.value:
return True
time = current_time()
rate = self.rate
width = self.width
if timestamp := self.enter_timestamps[-1]:
if timestamp + width / rate > time:
return True
if timestamp := self.exit_timestamps[rate - lock.value]:
if timestamp + width > time:
return True
return False
@property
def acquire_lock(self, /):
acquire_lock = self.__acquire_lock
if acquire_lock is None:
self.__acquire_lock = acquire_lock = Lock()
return acquire_lock
@property
def lock(self, /):
lock = self.__lock
if lock is None:
self.__lock = lock = Semaphore(self.rate, max_value=self.rate)
return lock
@x42005e1f
Copy link
Author

This is a simple implementation of a rate limiter I once wrote as part of a complex package that implicitly and dynamically compiles bundles of requests to an external API. It will be available here until an alternative RateLimiter is added to the aiologic package.

It simultaneously complies with two rate limits: a weak one and a strict one. The weak one sets an entry constraint: a task cannot enter the context until enough time (width / rate seconds) has passed since the previous entry of a task. Strict sets an exit constraint: a task cannot enter the context until there are less than the maximum number of exits in the current window of width size.

For example, if each task spends 1/5 second on its work, then simultaneous launch of 20 such tasks with RPSLock(10, 1) will take ~2.3 seconds: 0.1 * 19 = 1.9 seconds for weak constraint, 0.2 seconds for strict constraint (after the first bunch of 10 tasks) and another 0.2 seconds to wait for the last task to complete. In contrast, RPSLock(1, 1/10) will always trigger the strict constraint and will take ~5.9 seconds: 0.2 * 20 = 4 seconds to wait for each task to complete and 0.1 * 19 = 1.9 seconds for the strict constraint. This achieves the best possible compliance with the rate limits of external services.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment