Skip to content

Instantly share code, notes, and snippets.

@ryszard
Created August 13, 2009 12:56
Show Gist options
  • Select an option

  • Save ryszard/167150 to your computer and use it in GitHub Desktop.

Select an option

Save ryszard/167150 to your computer and use it in GitHub Desktop.
import threading
import time
class Unblocker(threading.Thread):
"""Release `lock` after `t` seconds.
"""
def __init__(self, lock, t):
self.lock = lock
self.time = t
super(Unblocker, self).__init__()
def run(self):
try:
time.sleep(self.time)
finally:
self.lock.release()
def call_interval(t):
"""Make sure that the decorated function won't be called again for
at least `t` seconds, after each call.
This is useful if you for example have a limit of maximum _n_
calls to an API per second and you want to be on the safe side
that you won't exceed it.
"""
def decorator(fun):
d = threading.Lock()
def _wrapper(*a, **kw):
d.acquire()
u = Unblocker(d, t)
try:
return fun(*a, **kw)
finally:
u.start()
return _wrapper
return decorator
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment