Skip to content

Instantly share code, notes, and snippets.

@bsolomon1124
Last active August 23, 2018 01:03
Show Gist options
  • Select an option

  • Save bsolomon1124/8a0995daf6334d06f9eafab1d84b0a53 to your computer and use it in GitHub Desktop.

Select an option

Save bsolomon1124/8a0995daf6334d06f9eafab1d84b0a53 to your computer and use it in GitHub Desktop.
Rate limit manager via Redis/Python
#!/usr/bin/env python3
# NOTE: No Python 2 compat. Redis
"""Mixin class for managing rate limiting of API keys via Redis."""
__all__ = ['RateLimitManagerMixin']
import datetime
import redis
_DEFAULT_TD_WINDOW = datetime.timedelta(minutes=15)
_DEFAULT_MAX_CALLS = 170
# This is what entries get set to at initialization or after
# their window has rolled off
RESET_MAP = {b'calls': 0, b'lastreset': 0}
def timestamp():
"""POSIX timestamp as number of seconds (int) since epoch."""
now = datetime.datetime.utcnow()
return int(now.replace(tzinfo=datetime.timezone.utc).timestamp())
class RateLimitManagerMixin(object):
"""Mixin class for managing rate limiting of API keys via Redis."""
def __init__(self,
reset_window=_DEFAULT_TD_WINDOW,
max_calls=_DEFAULT_MAX_CALLS,
client=redis.StrictRedis,
**kwargs):
if isinstance(client, redis.StrictRedis):
self._client = client
else:
self._client = client(**kwargs)
self.max_calls = max_calls
if isinstance(reset_window, datetime.timedelta):
reset_window = reset_window.total_seconds()
self.reset_window = int(reset_window)
# NOTE: do *not* edit RESPONSE_CALLBACKS
# We don't know if there are other unrelated key/values
# whose retreival type we don't want to inadvertantly affect
# NOTE: these are specialized methods to this particular mixin.
# Don't try to re-generalize them to any redis store.
def _reset(self, name, mapping=RESET_MAP):
self._client.hmset(name=name, mapping=mapping)
def hgetall(self, name):
if isinstance(name, str):
name = name.encode('utf-8')
data = self._client.hgetall(name)
if not data:
# Key does not exist at all; initialize an entry for it
# Yes, this is hidding a setter in a getter.
# Again--only suited for the task at hand
self._reset(name)
return RESET_MAP
else:
data = {k: int(v) for k, v in data.items()}
if data[b'lastreset'] == 0:
return data
now = timestamp()
if now - data[b'lastreset'] > self.reset_window:
# If we are past the window, reset anyway
self._reset(name)
return RESET_MAP
else:
return data
def hincr(self, name):
now = timestamp()
data = self.hgetall(name)
self._client.hincrby(name=name, key=b'calls')
if data[b'calls'] == 0:
self._client.hset(name, key=b'lastreset', value=now)
def calls_on_this_key(self, name):
return self.hgetall(name)[b'calls']
def key_last_reset(self, name):
return self.hgetall(name)[b'lastreset']
def keys(self):
return self._client.keys()
def is_rate_limited(self, name):
return self.calls_on_this_key(name) > self.max_calls
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment