Skip to content

Instantly share code, notes, and snippets.

@daboross
Last active May 22, 2016 00:37
Show Gist options
  • Save daboross/61e34fbf2e4661ecad17 to your computer and use it in GitHub Desktop.
Save daboross/61e34fbf2e4661ecad17 to your computer and use it in GitHub Desktop.
from fnmatch import fnmatch
from time import time
from util import hook
class TokenBucket(object):
"""An implementation of the token bucket algorithm.
>>> bucket = TokenBucket(80, 0.5)
>>> print bucket.consume(10)
True
>>> print bucket.consume(90)
False
"""
def __init__(self, tokens, fill_rate):
"""tokens is the total tokens in the bucket. fill_rate is the
rate in tokens/second that the bucket will be refilled."""
self.capacity = float(tokens)
self._tokens = float(tokens)
self.fill_rate = float(fill_rate)
self.timestamp = time()
def consume(self, tokens):
"""Consume tokens from the bucket. Returns True if there were
sufficient tokens otherwise False."""
if tokens <= self.tokens:
self._tokens -= tokens
else:
return False
return True
def refill(self):
self._tokens = self.capacity
def get_tokens(self):
now = time()
if self._tokens < self.capacity:
delta = self.fill_rate * (now - self.timestamp)
self._tokens = min(self.capacity, self._tokens + delta)
self.timestamp = now
return self._tokens
tokens = property(get_tokens)
command_limiting_initial_tokens = 10
command_limiting_restore_rate = 0.13
command_limiting_message_cost = 4
buckets = {}
@hook.sieve
def sieve_suite(bot, input, func, kind, args):
if kind != "command":
return input
allowed_permissions = ["adminonly"] # set this to permissions to bypass bucket
mask = input.mask.lower()
# loop over every group
for key, group in bot.config.get("permissions", []).iteritems():
# loop over every permission the command allows
for permission in allowed_permissions:
# see if the group has that permission
if permission in group["perms"]:
# if so, check it
group_users = [_mask.lower() for _mask in group["users"]]
for pattern in group_users:
if fnmatch(mask, pattern):
return input
if input.nick not in buckets:
bucket = TokenBucket(command_limiting_initial_tokens, command_limiting_restore_rate)
buckets[input.nick] = bucket
else:
bucket = buckets[input.nick]
if bucket.consume(command_limiting_message_cost):
return input
input.notice("Command rate-limited, please try again in a few seconds.")
return None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment