Created
February 7, 2018 13:02
-
-
Save maxpoletaev/49bad3d4c390fed692ba572b32268da5 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
from django.core.cache import cache as default_cache | |
from rest_framework.exceptions import Throttled | |
class RateLimitThrottle: | |
""" | |
The alternative implementation of Rest Framework's throttling feature, that is more driven | |
but is incompatible with a throttling_classes attribute. The example below gives 10 login | |
attempts in an hour to user "john". | |
throttle = RateLimitThrottle(scope='login', rate='10/hour', ident='john') | |
throttle.is_limit_reached(raise_exception=True) | |
throttle.register_attempt() | |
""" | |
cache_key_format = 'throttle_{scope}_{ident}' | |
exception_class = Throttled | |
cache = default_cache | |
timer = time.time | |
def __init__(self, scope, rate, ident): | |
self.scope = scope | |
self.now = self.timer() | |
self.ident = self.parse_ident(ident) | |
self.num_requests, self.duration = self.parse_rate(rate) | |
def parse_ident(self, ident): | |
""" | |
This method could be overridden for using any data structure as an indent. | |
""" | |
return ident | |
def get_cache_key(self): | |
""" | |
Returns cache key based on scope and ident. | |
""" | |
if not self.ident: | |
return None | |
return self.cache_key_format.format(scope=self.scope, ident=self.ident) | |
def is_limit_reached(self, raise_exception=False): | |
self.cache_key = self.get_cache_key() | |
if self.cache_key is None: | |
return True | |
self.history = self.cache.get(self.cache_key, []) | |
while self.history and self.history[-1] <= self.now - self.duration: | |
self.history.pop() | |
if len(self.history) >= self.num_requests: | |
if raise_exception: | |
raise self.exception_class(wait=self.wait()) | |
return False | |
return True | |
def register_attempt(self): | |
if self.cache_key is None: | |
return | |
self.history.insert(0, self.now) | |
self.cache.set(self.cache_key, self.history, self.duration) | |
def parse_rate(self, rate): | |
""" | |
Given the request rate string, return a two tuple of: | |
<allowed number of requests>, <period of time in seconds> | |
""" | |
if rate is None: | |
return (None, None) | |
num, period = rate.split('/') | |
num_requests = int(num) | |
duration = {'s': 1, 'm': 60, 'h': 3600, 'd': 86400}[period[0]] | |
return (num_requests, duration) | |
def wait(self): | |
""" | |
Returns the recommended next request time in seconds. | |
""" | |
if self.history: | |
remaining_duration = self.duration - (self.now - self.history[-1]) | |
else: | |
remaining_duration = self.duration | |
available_requests = self.num_requests - len(self.history) + 1 | |
if available_requests <= 0: | |
return None | |
return remaining_duration / float(available_requests) | |
def reset(self): | |
key = self.get_cache_key() | |
if key is not None: | |
self.cache.delete(key) | |
class CompoundThrottle: | |
""" | |
Wrapper which allows using a few throttles like one. The example below gives | |
10 login attempts in an hour to user "john" but not more than 50 attempts a day. | |
throttle = CompoundThrottle( | |
RateLimitThrottle(scope='login', rate='10/hour', ident='john'), | |
RateLimitThrottle(scope='login', rate='50/day', ident='john'), | |
) | |
throttle.is_limit_reached(raise_exception=True) | |
throttle.register_attempt() | |
""" | |
def __init__(self, *throttles): | |
self.throttles = throttles | |
def is_limit_reached(self, *args, **kwargs): | |
for throttle in self.throttles: | |
result = throttle.is_limit_reached(*args, **kwargs) | |
if result: | |
return True | |
return False | |
def register_attempt(self): | |
affected_scopes = set() | |
for throttle in self.throttles: | |
if throttle.scope not in affected_scopes: | |
throttle.register_attempt() | |
affected_scopes.add(throttle.scope) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment