Last active
November 4, 2021 04:53
-
-
Save mgd020/804a27b8b113e1ff535e04889db9d145 to your computer and use it in GitHub Desktop.
Django throttling class using cache
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from typing import List, NamedTuple | |
from django.core.cache import caches | |
class RateLimit(NamedTuple): | |
seconds: int | |
limit: int | |
class Throttle: | |
""" | |
Determine if a subject has been throttled given a collection of rates. | |
Uses Django's default cache to store counts. | |
Example: | |
# 5 per second and 10 every 5 seconds | |
throttle = Throttle([RateLimit(1, 5), RateLimit(5, 10)]) | |
def my_view(request): | |
retry_after = throttle.check('request_log') | |
if retry_after: | |
print('request was throttled') | |
response = HttpResponse(status=429) | |
response['retry-after'] = retry_after | |
return response | |
print('request was not throttled') | |
return HttpResponse() | |
""" | |
__slots__ = ['rate_limits', 'cache'] | |
def __init__( | |
self, | |
rate_limits: List[RateLimit], | |
*, | |
cache=None, | |
): | |
self.rate_limits = rate_limits | |
self.cache = caches['default'] if cache is None else cache | |
def check(self, key: str) -> int: | |
"""Check if key is throttled by any rate limits, and return retry after seconds.""" | |
incr = self.cache.incr | |
add = self.cache.add | |
retry_after = 0 | |
now = int(time.time()) | |
for rate, limit in self.rate_limits: | |
time_chunk = now // rate | |
cache_key = f'throttle:{rate}:{time_chunk}:{key}' | |
while True: | |
try: | |
count = incr(cache_key) | |
break | |
except ValueError: | |
count = 1 | |
if add(cache_key, count, rate): | |
break | |
if count > limit: | |
next_time_chunk_start = (time_chunk + 1) * rate | |
retry_after = max(retry_after, next_time_chunk_start - now) | |
return retry_after | |
def reset(self, key: str): | |
now = int(time.time()) | |
self.cache.delete_many(f'throttle:{now // rate}:{key}' for rate, _ in self.rate_limits) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment