Last active
November 10, 2025 01:36
-
-
Save x42005e1f/976f84ac395ae2b51f14c8507be0ea34 to your computer and use it in GitHub Desktop.
A rate limiter (async-aware only)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # SPDX-FileCopyrightText: 2022 Ilya Egorov <[email protected]> | |
| # SPDX-License-Identifier: 0BSD | |
| __all__ = ( | |
| "RPSLock", | |
| ) | |
| from functools import wraps | |
| from itertools import repeat | |
| from collections import deque | |
| from anyio import Lock, Semaphore, WouldBlock, current_time, sleep_until | |
| class RPSLock: | |
| __slots__ = ( | |
| "__weakref__", | |
| "__acquire_lock", "__lock", | |
| "enter_timestamps", "exit_timestamps", | |
| "rate", "width", | |
| ) | |
| def __new__(cls, /, rate=1, width=1): | |
| self = super().__new__(cls) | |
| self.__acquire_lock = None | |
| self.__lock = None | |
| self.enter_timestamps = deque(repeat(0, rate), rate) | |
| self.exit_timestamps = deque(repeat(0, rate), rate) | |
| self.rate = rate | |
| self.width = width | |
| return self | |
| def __getnewargs__(self, /): | |
| return (self.rate, self.width) | |
| def __getstate__(self, /): | |
| return None | |
| def __repr__(self, /): | |
| return f"RPSLock(rate={self.rate}, width={self.width})" | |
| async def __aenter__(self, /): | |
| await self.acquire() | |
| return self | |
| async def __aexit__(self, /, exc_type, exc_value, traceback): | |
| self.release() | |
| def __call__(self, func, /): | |
| @wraps(func) | |
| async def wrapper(*args, **kwargs): | |
| async with self: | |
| return await func(*args, **kwargs) | |
| return wrapper | |
| async def acquire(self, /): | |
| async with self.acquire_lock: | |
| lock = self.lock | |
| await lock.acquire() | |
| try: | |
| rate = self.rate | |
| width = self.width | |
| if timestamp := self.enter_timestamps[-1]: | |
| enter_deadline = timestamp + width / rate | |
| else: | |
| enter_deadline = 0 | |
| if timestamp := self.exit_timestamps[rate - lock.value - 1]: | |
| exit_deadline = timestamp + width | |
| else: | |
| exit_deadline = 0 | |
| await sleep_until(max(enter_deadline, exit_deadline)) | |
| self.enter_timestamps.append(current_time()) | |
| except BaseException: | |
| lock.release() | |
| raise | |
| def acquire_nowait(self, /): | |
| if self.locked(): | |
| raise WouldBlock | |
| lock = self.lock | |
| lock.acquire_nowait() | |
| try: | |
| self.enter_timestamps.append(current_time()) | |
| except BaseException: | |
| lock.release() | |
| raise | |
| def release(self, /): | |
| self.exit_timestamps.append(current_time()) | |
| self.lock.release() | |
| def locked(self, /): | |
| acquire_lock = self.__acquire_lock | |
| if acquire_lock is None: | |
| return False | |
| if acquire_lock.locked(): | |
| return True | |
| lock = self.__lock | |
| if lock is None: | |
| return False | |
| if not lock.value: | |
| return True | |
| time = current_time() | |
| rate = self.rate | |
| width = self.width | |
| if timestamp := self.enter_timestamps[-1]: | |
| if timestamp + width / rate > time: | |
| return True | |
| if timestamp := self.exit_timestamps[rate - lock.value]: | |
| if timestamp + width > time: | |
| return True | |
| return False | |
| @property | |
| def acquire_lock(self, /): | |
| acquire_lock = self.__acquire_lock | |
| if acquire_lock is None: | |
| self.__acquire_lock = acquire_lock = Lock() | |
| return acquire_lock | |
| @property | |
| def lock(self, /): | |
| lock = self.__lock | |
| if lock is None: | |
| self.__lock = lock = Semaphore(self.rate, max_value=self.rate) | |
| return lock |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a simple implementation of a rate limiter I once wrote as part of a complex package that implicitly and dynamically compiles bundles of requests to an external API. It will be available here until an alternative
RateLimiteris added to the aiologic package.It simultaneously complies with two rate limits: a weak one and a strict one. The weak one sets an entry constraint: a task cannot enter the context until enough time (
width / rateseconds) has passed since the previous entry of a task. Strict sets an exit constraint: a task cannot enter the context until there are less than the maximum number of exits in the current window ofwidthsize.For example, if each task spends 1/5 second on its work, then simultaneous launch of 20 such tasks with
RPSLock(10, 1)will take ~2.3 seconds: 0.1 * 19 = 1.9 seconds for weak constraint, 0.2 seconds for strict constraint (after the first bunch of 10 tasks) and another 0.2 seconds to wait for the last task to complete. In contrast,RPSLock(1, 1/10)will always trigger the strict constraint and will take ~5.9 seconds: 0.2 * 20 = 4 seconds to wait for each task to complete and 0.1 * 19 = 1.9 seconds for the strict constraint. This achieves the best possible compliance with the rate limits of external services.