-
-
Save gregburek/1441055 to your computer and use it in GitHub Desktop.
import time | |
def RateLimited(maxPerSecond): | |
minInterval = 1.0 / float(maxPerSecond) | |
def decorate(func): | |
lastTimeCalled = [0.0] | |
def rateLimitedFunction(*args,**kargs): | |
elapsed = time.clock() - lastTimeCalled[0] | |
leftToWait = minInterval - elapsed | |
if leftToWait>0: | |
time.sleep(leftToWait) | |
ret = func(*args,**kargs) | |
lastTimeCalled[0] = time.clock() | |
return ret | |
return rateLimitedFunction | |
return decorate | |
@RateLimited(2) # 2 per second at most | |
def PrintNumber(num): | |
print num | |
if __name__ == "__main__": | |
print "This should print 1,2,3... at about 2 per second." | |
for i in range(1,100): | |
PrintNumber(i) |
Multi-theading issues occur in both versions, that use locks posted above. last_time_called
is being updated after the lock has been released. That leads to upredictable fluctuations in rate limits when the decorator is being called from multiple threads. The final fixed version that is being used in production:
def rate_limited(max_per_second):
lock = threading.Lock()
min_interval = 1.0 / max_per_second
def decorate(func):
last_time_called = time.perf_counter()
@wraps(func)
def rate_limited_function(*args, **kwargs):
lock.acquire()
nonlocal last_time_called
elapsed = time.perf_counter() - last_time_called
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
ret = func(*args, **kwargs)
last_time_called = time.perf_counter()
lock.release()
return ret
return rate_limited_function
return decorate
Note: nonlocal
keyword has been used, so the code is only compatible with python 3
@sintezcs great work
love it
Version posted by @sintezcs will not release the lock if func
raises Exception.
def rate_limited(max_per_second: int):
"""Rate-limits the decorated function locally, for one process."""
lock = threading.Lock()
min_interval = 1.0 / max_per_second
def decorate(func):
last_time_called = time.perf_counter()
@wraps(func)
def rate_limited_function(*args, **kwargs):
lock.acquire()
nonlocal last_time_called
try:
elapsed = time.perf_counter() - last_time_called
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
return func(*args, **kwargs)
finally:
last_time_called = time.perf_counter()
lock.release()
return rate_limited_function
return decorate
This is great. I can't see any mention of a license tho, which means:
Code without an explicit license is protected by copyright and is by default All Rights Reserved. The person or people who wrote the code are protected as such. Any time you're using software you didn't write, licensing should be considered and abided. - Brian Doll, GitHub's VP of Marketing
i.e. using the code will be copyright infringement in most parts of the world, and so is risky... Any chance we could make it open source so that it can be used as well?
https://choosealicense.com/no-license/
At this stage, this would require the original poster, and anyone who has modified a given version (including intermediate versions) to declare licensing. Can I suggest the original posted makes a choice following this guidance here:
maybe MIT? MIT licenced code can of course be re-licenced under other terms by modifiers if they want to, so that their modifications fall under copyleft (e.g. MIT -> GPL, but not the other way around).
Thanks!
Tim.
Thanks team for sharing this. I've built a class for use in multiprocessing mode. Hope it helps others.
- I'm using time.time()
- Using multiprocessing instead of threading
- it's compatible with python 2.7 and 3.6
"""
by oPromessa, 2017
Published on https://github.com/oPromessa/flickr-uploader/
Inspired by: https://gist.github.com/gregburek/1441055
Helper class and functions to rate limiting function calls with Python Decorators
MIT License
Copyright (c) 2017 oPromessa
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
# ----------------------------------------------------------------------------
# Import section for Python 2 and 3 compatible code
# from __future__ import absolute_import, division, print_function, unicode_literals
from __future__ import division # This way: 3 / 2 == 1.5; 3 // 2 == 1
# ----------------------------------------------------------------------------
# Import section
#
import sys
import logging
import multiprocessing
import time
from functools import wraps
# -----------------------------------------------------------------------------
# class LastTime to be used with rate_limited
#
class LastTime:
"""
>>> import rate_limited as rt
>>> a = rt.LastTime()
>>> a.add_cnt()
>>> a.get_cnt()
1
>>> a.add_cnt()
>>> a.get_cnt()
2
"""
def __init__(self, name='LT'):
# Init variables to None
self.name = name
self.ratelock = None
self.cnt = None
self.last_time_called = None
# Instantiate control variables
self.ratelock = multiprocessing.Lock()
self.cnt = multiprocessing.Value('i', 0)
self.last_time_called = multiprocessing.Value('d', 0.0)
logging.debug('\t__init__: name=[{!s}]'.format(self.name))
def acquire(self):
self.ratelock.acquire()
def release(self):
self.ratelock.release()
def set_last_time_called(self):
self.last_time_called.value = time.time()
# self.debug('set_last_time_called')
def get_last_time_called(self):
return self.last_time_called.value
def add_cnt(self):
self.cnt.value += 1
def get_cnt(self):
return self.cnt.value
def debug(self, debugname='LT'):
now=time.time()
logging.debug('___Rate name:[{!s}] '
'debug=[{!s}] '
'\n\t cnt:[{!s}] '
'\n\tlast_called:{!s} '
'\n\t timenow():{!s} '
.format(self.name,
debugname,
self.cnt.value,
time.strftime(
'%T.{}'
.format(str(self.last_time_called.value -
int(self.last_time_called.value))
.split('.')[1][:3]),
time.localtime(self.last_time_called.value)),
time.strftime(
'%T.{}'
.format(str(now -
int(now))
.split('.')[1][:3]),
time.localtime(now))))
# -----------------------------------------------------------------------------
# rate_limited
#
# retries execution of a function
def rate_limited(max_per_second):
min_interval = 1.0 / max_per_second
LT = LastTime('rate_limited')
def decorate(func):
LT.acquire()
if LT.get_last_time_called() == 0:
LT.set_last_time_called()
LT.debug('DECORATE')
LT.release()
@wraps(func)
def rate_limited_function(*args, **kwargs):
logging.warning('___Rate_limited f():[{!s}]: '
'Max_per_Second:[{!s}]'
.format(func.__name__, max_per_second))
try:
LT.acquire()
LT.add_cnt()
xfrom = time.time()
elapsed = xfrom - LT.get_last_time_called()
left_to_wait = min_interval - elapsed
logging.debug('___Rate f():[{!s}] '
'cnt:[{!s}] '
'\n\tlast_called:{!s} '
'\n\t time now():{!s} '
'elapsed:{:6.2f} '
'min:{!s} '
'to_wait:{:6.2f}'
.format(func.__name__,
LT.get_cnt(),
time.strftime(
'%T',
time.localtime(
LT.get_last_time_called())),
time.strftime('%T',
time.localtime(xfrom)),
elapsed,
min_interval,
left_to_wait))
if left_to_wait > 0:
time.sleep(left_to_wait)
ret = func(*args, **kwargs)
LT.debug('OVER')
LT.set_last_time_called()
LT.debug('NEXT')
except Exception as ex:
# CODING: To be changed once reportError is on a module
sys.stderr.write('+++000 '
'Exception on rate_limited_function: [{!s}]\n'
.format(ex))
sys.stderr.flush()
# reportError(Caught=True,
# CaughtPrefix='+++',
# CaughtCode='000',
# CaughtMsg='Exception on rate_limited_function',
# exceptUse=True,
# # exceptCode=ex.code,
# exceptMsg=ex,
# NicePrint=False,
# exceptSysInfo=True)
raise
finally:
LT.release()
return ret
return rate_limited_function
return decorate
# -----------------------------------------------------------------------------
# Samples
#@rate_limited(5) # 5 calls per second
# def print_num(num):
# print (num )
# -----------------------------------------------------------------------------
# If called directly run doctests
#
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s]:[%(processName)-11s]' +
'[%(levelname)-8s]:[%(name)s] %(message)s')
import doctest
doctest.testmod()
# Comment following line to allow further debugging/testing
# sys.exit(0)
# n for n calls per second (ex. 3 means 3 calls per second)
# 1/n for n seconds per call (ex. 0.5 meand 4 seconds in between calls)
@rate_limited(1)
def print_num(prc, num):
"""
"""
print('\t\t***prc:[{!s}] num:[{!s}] '
'rate_limit timestamp:[{!s}]'
.format(prc, num, time.strftime('%T')))
print('-------------------------------------------------Single Processing')
for process in range(1, 3):
for j in range(1, 2):
print_num(process, j)
print('-------------------------------------------------Multi Processing')
def fmulti(x, prc):
import random
for i in range(1,x):
r = random.randrange(6)
print('\t\t[prc:{!s}] [{!s}]'
'->- WORKing {!s}s----[{!s}]'
.format(prc, i, r, time.strftime('%T')))
time.sleep(r)
print('\t\t[prc:{!s}] [{!s}]--> Before:---[{!s}]'
.format(prc, i, time.strftime('%T')))
print_num(prc, i)
print('\t\t[prc:{!s}] [{!s}]<-- After----[{!s}]'
.format(prc, i, time.strftime('%T')))
TaskPool = []
for j in range(1,4):
Task = multiprocessing.Process(target=fmulti, args=(5,j))
TaskPool.append(Task)
Task.start()
for j in TaskPool:
print('{!s}.is_alive = {!s}'.format(j.name, j.is_alive()))
while (True):
if not (any(multiprocessing.active_children())):
print('===No active children Processes.')
break
for p in multiprocessing.active_children():
print('==={!s}.is_alive = {!s}'.format(p.name, p.is_alive()))
uploadTaskActive = p
print('===Will wait for 60 on {!s}.is_alive = {!s}'
.format(uploadTaskActive.name,
uploadTaskActive.is_alive()))
uploadTaskActive.join(timeout=60)
print('===Waited for 60s on {!s}.is_alive = {!s}'
.format(uploadTaskActive.name,
uploadTaskActive.is_alive()))
# Wait for join all jobs/tasks in the Process Pool
# All should be done by now!
for j in TaskPool:
j.join()
print('==={!s} (is alive: {!s}).exitcode = {!s}'
.format(j.name, j.is_alive(), j.exitcode))
@oPromessa thanks for sharing and nice job on the logging and example.
Why dont you use a counter instead of a list. Each time the function is called, increment the counter by 1 and reset the counter to 0 when the second or minute has changed
Any updates regarding licensing of these pieces of code?
Note, there are other libraries on PyPI that do this, such as ratelimit
A more simple version but with async compatibility https://gist.github.com/hugokernel/c478d639849e1e772b1395a546100031
I created a similar asyncio version but using the token bucket algorithm for
max number of calls during/time interval which results in a maximum burst rate. it can be combined with the above rate limiter without burst rate..
https://gist.github.com/yeus/dff02dce88c6da9073425b5309f524dd
An extended version of @oPromessa 's utility. Thank you for sharing @oPromessa !
I added Python 3.12 generic type-hinting and support for recursive calls using an RLock
:
from typing import Callable, cast
from functools import wraps
import threading
import time
def rate_limited[T, **P](max_per_second: int) -> Callable[[Callable[P, T]], Callable[P, T]]:
def decorator(fn: Callable[P, T]) -> Callable[P, T]:
lock = threading.RLock()
min_interval = 1.0 / max_per_second
last_time_called = 0.0
@wraps(fn)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
with lock:
nonlocal last_time_called
elapsed = time.perf_counter() - last_time_called
left_to_wait = min_interval - elapsed
if left_to_wait > 0:
time.sleep(left_to_wait)
last_time_called = time.perf_counter()
return fn(*args, **kwargs)
return cast(Callable[P, T], wrapper)
return decorator
Because it is not a local variable, and we can't modify it unless
nonlocal
is used.However,
nonlocal
is a new feature in python 3. In order to modify last_time_called in python 2.x, as a workaround, we use array and modify its element.