Skip to content

Instantly share code, notes, and snippets.

@tkhoa2711
Last active August 29, 2015 14:20
Show Gist options
  • Save tkhoa2711/d7bfbe9161fe03512401 to your computer and use it in GitHub Desktop.
Save tkhoa2711/d7bfbe9161fe03512401 to your computer and use it in GitHub Desktop.
A simple decorator for throttle functionality in Python
import time, platform
from collections import defaultdict
from functools import wraps
get_time = time.clock if platform.system() == 'Windows' else time.time
__nop = lambda: None
__throttle_last_time = defaultdict(__nop)
def throttle(duration=1, **kw):
"""
Throttle a function on a duration. Prevent it to be called
more than once within a period.
For example:
# normal use case
@throttle(5)
def echo(msg):
print "echo %s" % msg
# more sophisticated use case
def _update_status():
import datetime
print "last updated: %s" % datetime.datetime.now()
@throttle(duration=10, on_throttling=_update_status)
def echo(msg):
print "echo %s" % msg
@param duration: the number of seconds for throttling period
@type duration: int or float
@param on_throttling: a function to be called during the throttle period
@type on_throttling: function or callable
"""
on_throttling = kw.pop("on_throttling", __nop)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_time = __throttle_last_time[func]
if last_time is None or get_time() >= last_time + duration:
__throttle_last_time[func] = get_time()
return func(*args, **kwargs)
else:
on_throttling()
return wrapper
return decorator
def _update_status():
import datetime
print "last updated: %s" % datetime.datetime.now()
@throttle(0.00001, on_throttling=_update_status)
def echo(msg):
print "echo: %s" % msg
if __name__ == "__main__":
echo(1)
echo(2)
echo(3)
import time
time.sleep(1)
echo (4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment