-
-
Save mapix/ec8c944ed6c3641941a32af4f2f4ec4c to your computer and use it in GitHub Desktop.
Python generator rate limiter using token bucket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import time, sleep | |
_128k = 128 * 1024 | |
_256k = 256 * 1024 | |
_512k = 512 * 1024 | |
_1024k = 1024 * 1024 | |
class TokenBucket(object): | |
"""An implementation of the token bucket algorithm. | |
>>> bucket = TokenBucket(80, 0.5) | |
>>> print bucket.consume(10) | |
True | |
adapted from http://code.activestate.com/recipes/511490-implementation-of-the-token-bucket-algorithm/?in=lang-python | |
Not thread safe. | |
""" | |
__slots__ = ['capacity', '_tokens', 'fill_rate', 'timestamp'] | |
def __init__(self, tokens, fill_rate): | |
"""tokens is the total tokens in the bucket. fill_rate is the | |
rate in tokens/second that the bucket will be refilled.""" | |
self.capacity = float(tokens) | |
self._tokens = float(tokens) | |
self.fill_rate = float(fill_rate) | |
self.timestamp = time() | |
def consume(self, tokens, block=True): | |
"""Consume tokens from the bucket. Returns True if there were | |
sufficient tokens. | |
If there are not enough tokens and block is True, sleeps until the | |
bucket is replenished enough to satisfy the deficiency. | |
If there are not enough tokens and block is False, returns False. | |
It is an error to consume more tokens than the bucket capacity. | |
""" | |
assert tokens <= self.capacity, \ | |
'Attempted to consume {} tokens from a bucket with capacity {}' \ | |
.format(tokens, self.capacity) | |
if block and tokens > self.tokens: | |
deficit = tokens - self._tokens | |
delay = deficit / self.fill_rate | |
# print 'Have {} tokens, need {}; sleeping {} seconds'.format(self._tokens, tokens, delay) | |
sleep(delay) | |
if tokens <= self.tokens: | |
self._tokens -= tokens | |
return True | |
else: | |
return False | |
@property | |
def tokens(self): | |
if self._tokens < self.capacity: | |
now = time() | |
delta = self.fill_rate * (now - self.timestamp) | |
self._tokens = min(self.capacity, self._tokens + delta) | |
self.timestamp = now | |
return self._tokens | |
class InfiniteTokenBucket(object): | |
"""TokenBucket implementation with infinite capacity, i.e. consume always | |
returns True.""" | |
__slots__ = () | |
def __init__(self, tokens=None, fill_rate=None): | |
pass | |
def consume(self, tokens, block=True): | |
return True | |
@property | |
def tokens(self): | |
return float('infinity') | |
def rate_limit(data, bandwidth_or_burst, steady_state_bandwidth=None): | |
"""Limit the bandwidth of a generator. | |
Given a data generator, return a generator that yields the data at no | |
higher than the specified bandwidth. For example, ``rate_limit(data, _256k)`` | |
will yield from data at no higher than 256KB/s. | |
The three argument form distinguishes burst from steady-state bandwidth, | |
so ``rate_limit(data, _1024k, _128k)`` would allow data to be consumed at | |
128KB/s with an initial burst of 1MB. | |
""" | |
bandwidth = steady_state_bandwidth or bandwidth_or_burst | |
rate_limiter = TokenBucket(bandwidth_or_burst, bandwidth) | |
for thing in data: | |
rate_limiter.consume(len(str(thing))) | |
yield thing |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment