Skip to content

Instantly share code, notes, and snippets.

@asvetlov
Created April 27, 2014 11:32
Show Gist options
  • Save asvetlov/11343386 to your computer and use it in GitHub Desktop.
Save asvetlov/11343386 to your computer and use it in GitHub Desktop.
Control set of unhandled delayed calls
import asyncio
import functools
handles = set()
def call_later(delay, cb, *args, loop=None):
if loop is None:
loop = asyncio.get_event_loop()
@functools.wraps(cb)
def wrapper(*args):
try:
return cb(*args)
finally:
handles.remove(h)
h = loop.call_later(delay, wrapper, *args)
handles.add(h)
return h
def cb(arg):
print('Call', arg)
loop = asyncio.get_event_loop()
call_later(0.01, cb, 'a')
call_later(0.02, cb, 'b')
call_later(0.1, cb, 'c')
print(handles)
loop.run_until_complete(asyncio.sleep(0.05))
print(handles)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment