Created
August 5, 2013 12:08
-
-
Save drocco007/6155452 to your computer and use it in GitHub Desktop.
Python generator rate limiter using token bucket
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from time import time, sleep | |
_128k = 128 * 1024 | |
_256k = 256 * 1024 | |
_512k = 512 * 1024 | |
_1024k = 1024 * 1024 | |
class TokenBucket(object): | |
"""An implementation of the token bucket algorithm. | |
>>> bucket = TokenBucket(80, 0.5) | |
>>> print bucket.consume(10) | |
True | |
adapted from http://code.activestate.com/recipes/511490-implementation-of-the-token-bucket-algorithm/?in=lang-python | |
Not thread safe. | |
""" | |
__slots__ = ['capacity', '_tokens', 'fill_rate', 'timestamp'] | |
def __init__(self, tokens, fill_rate): | |
"""tokens is the total tokens in the bucket. fill_rate is the | |
rate in tokens/second that the bucket will be refilled.""" | |
self.capacity = float(tokens) | |
self._tokens = float(tokens) | |
self.fill_rate = float(fill_rate) | |
self.timestamp = time() | |
def consume(self, tokens, block=True): | |
"""Consume tokens from the bucket. Returns True if there were | |
sufficient tokens. | |
If there are not enough tokens and block is True, sleeps until the | |
bucket is replenished enough to satisfy the deficiency. | |
If there are not enough tokens and block is False, returns False. | |
It is an error to consume more tokens than the bucket capacity. | |
""" | |
assert tokens <= self.capacity, \ | |
'Attempted to consume {} tokens from a bucket with capacity {}' \ | |
.format(tokens, self.capacity) | |
if block and tokens > self.tokens: | |
deficit = tokens - self._tokens | |
delay = deficit / self.fill_rate | |
# print 'Have {} tokens, need {}; sleeping {} seconds'.format(self._tokens, tokens, delay) | |
sleep(delay) | |
if tokens <= self.tokens: | |
self._tokens -= tokens | |
return True | |
else: | |
return False | |
@property | |
def tokens(self): | |
if self._tokens < self.capacity: | |
now = time() | |
delta = self.fill_rate * (now - self.timestamp) | |
self._tokens = min(self.capacity, self._tokens + delta) | |
self.timestamp = now | |
return self._tokens | |
class InfiniteTokenBucket(object): | |
"""TokenBucket implementation with infinite capacity, i.e. consume always | |
returns True.""" | |
__slots__ = () | |
def __init__(self, tokens=None, fill_rate=None): | |
pass | |
def consume(self, tokens, block=True): | |
return True | |
@property | |
def tokens(self): | |
return float('infinity') | |
def rate_limit(data, bandwidth_or_burst, steady_state_bandwidth=None): | |
"""Limit the bandwidth of a generator. | |
Given a data generator, return a generator that yields the data at no | |
higher than the specified bandwidth. For example, ``rate_limit(data, _256k)`` | |
will yield from data at no higher than 256KB/s. | |
The three argument form distinguishes burst from steady-state bandwidth, | |
so ``rate_limit(data, _1024k, _128k)`` would allow data to be consumed at | |
128KB/s with an initial burst of 1MB. | |
""" | |
bandwidth = steady_state_bandwidth or bandwidth_or_burst | |
rate_limiter = TokenBucket(bandwidth_or_burst, bandwidth) | |
for thing in data: | |
rate_limiter.consume(len(str(thing))) | |
yield thing |
reformat
# -*- coding:utf-8 -*-
from time import time, sleep
_128k = 128 * 1024
_256k = 256 * 1024
_512k = 512 * 1024
_1024k = 1024 * 1024
class TokenBucket(object):
"""An implementation of the token bucket algorithm.
>>> bucket = TokenBucket(80, 0.5)
>>> print bucket.consume(10)
True
adapted from http://code.activestate.com/recipes/511490-implementation-of-the-token-bucket-algorithm/?in=lang-python
Not thread safe.
"""
__slots__ = ['capacity', '_tokens', 'fill_rate', 'timestamp']
def __init__(self, tokens, fill_rate):
"""tokens is the total tokens in the bucket. fill_rate is the
rate in tokens/second that the bucket will be refilled."""
self.capacity = float(tokens) # 容量
self._tokens = float(tokens)
self.fill_rate = float(fill_rate) # 产生速度
self.timestamp = time()
def consume(self, tokens, block=True):
"""Consume tokens from the bucket. Returns True if there were
sufficient tokens.
If there are not enough tokens and block is True, sleeps until the
bucket is replenished enough to satisfy the deficiency.
If there are not enough tokens and block is False, returns False.
It is an error to consume more tokens than the bucket capacity.
"""
assert tokens <= self.capacity, \
'Attempted to consume {} tokens from a bucket with capacity {}' \
.format(tokens, self.capacity)
if block and tokens > self.tokens:
deficit = tokens - self._tokens
delay = deficit / self.fill_rate
print('Have {} tokens, need {}; sleeping {} seconds'.format(self._tokens, tokens, delay))
sleep(delay)
if tokens <= self.tokens:
self._tokens -= tokens
return True
else:
return False
@property
def tokens(self):
if self._tokens < self.capacity:
now = time() # 获取当前时间
delta = self.fill_rate * (now - self.timestamp) # 算出这段时间产出的令牌
self._tokens = min(self.capacity, self._tokens + delta) # 丢弃超出容量的令牌
self.timestamp = now # 更新基准时间
return self._tokens
class InfiniteTokenBucket(object):
"""TokenBucket implementation with infinite capacity, i.e. consume always
returns True."""
__slots__ = ()
def __init__(self, tokens=None, fill_rate=None):
pass
def consume(self, tokens, block=True):
return True
@property
def tokens(self):
return float('infinity')
def rate_limit(data, bandwidth_or_burst, steady_state_bandwidth=None):
"""Limit the bandwidth of a generator.
Given a data generator, return a generator that yields the data at no
higher than the specified bandwidth. For example, ``rate_limit(data, _256k)``
will yield from data at no higher than 256KB/s.
The three argument form distinguishes burst from steady-state bandwidth,
so ``rate_limit(data, _1024k, _128k)`` would allow data to be consumed at
128KB/s with an initial burst of 1MB.
"""
bandwidth = steady_state_bandwidth or bandwidth_or_burst
rate_limiter = TokenBucket(bandwidth_or_burst, bandwidth)
for thing in data:
rate_limiter.consume(len(str(thing)))
yield thing
if __name__ == '__main__':
stream = rate_limit(range(10), 5, 1)
for i in stream:
print(time(), i)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
awful format..