Last active
May 22, 2016 00:37
-
-
Save daboross/61e34fbf2e4661ecad17 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from fnmatch import fnmatch | |
from time import time | |
from util import hook | |
class TokenBucket(object): | |
"""An implementation of the token bucket algorithm. | |
>>> bucket = TokenBucket(80, 0.5) | |
>>> print bucket.consume(10) | |
True | |
>>> print bucket.consume(90) | |
False | |
""" | |
def __init__(self, tokens, fill_rate): | |
"""tokens is the total tokens in the bucket. fill_rate is the | |
rate in tokens/second that the bucket will be refilled.""" | |
self.capacity = float(tokens) | |
self._tokens = float(tokens) | |
self.fill_rate = float(fill_rate) | |
self.timestamp = time() | |
def consume(self, tokens): | |
"""Consume tokens from the bucket. Returns True if there were | |
sufficient tokens otherwise False.""" | |
if tokens <= self.tokens: | |
self._tokens -= tokens | |
else: | |
return False | |
return True | |
def refill(self): | |
self._tokens = self.capacity | |
def get_tokens(self): | |
now = time() | |
if self._tokens < self.capacity: | |
delta = self.fill_rate * (now - self.timestamp) | |
self._tokens = min(self.capacity, self._tokens + delta) | |
self.timestamp = now | |
return self._tokens | |
tokens = property(get_tokens) | |
command_limiting_initial_tokens = 10 | |
command_limiting_restore_rate = 0.13 | |
command_limiting_message_cost = 4 | |
buckets = {} | |
@hook.sieve | |
def sieve_suite(bot, input, func, kind, args): | |
if kind != "command": | |
return input | |
allowed_permissions = ["adminonly"] # set this to permissions to bypass bucket | |
mask = input.mask.lower() | |
# loop over every group | |
for key, group in bot.config.get("permissions", []).iteritems(): | |
# loop over every permission the command allows | |
for permission in allowed_permissions: | |
# see if the group has that permission | |
if permission in group["perms"]: | |
# if so, check it | |
group_users = [_mask.lower() for _mask in group["users"]] | |
for pattern in group_users: | |
if fnmatch(mask, pattern): | |
return input | |
if input.nick not in buckets: | |
bucket = TokenBucket(command_limiting_initial_tokens, command_limiting_restore_rate) | |
buckets[input.nick] = bucket | |
else: | |
bucket = buckets[input.nick] | |
if bucket.consume(command_limiting_message_cost): | |
return input | |
input.notice("Command rate-limited, please try again in a few seconds.") | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment