Skip to content

Instantly share code, notes, and snippets.

@fabiocerqueira
Created January 3, 2017 16:17
Show Gist options
  • Select an option

  • Save fabiocerqueira/749efa8ea68f196e4169989dbfd3cf7b to your computer and use it in GitHub Desktop.

Select an option

Save fabiocerqueira/749efa8ea68f196e4169989dbfd3cf7b to your computer and use it in GitHub Desktop.
Simple example running python periodic python functions
import asyncio
from collections import namedtuple
from functools import wraps
tasks = []
loop = asyncio.get_event_loop()
PeriodicTask = namedtuple('PeriodicTask', ['task', 'interval', 'start'])
def periodic(interval, start=False):
def interal_periodic(func):
@wraps(func)
def wrapper(*args, **kwargs):
ret = func(*args, **kwargs)
loop.call_later(interval, wrapper)
return ret
tasks.append(PeriodicTask(task=wrapper, interval=interval, start=start))
return wrapper
return interal_periodic
@periodic(interval=1)
def task1():
print('task1')
@periodic(interval=10, start=True)
def task2():
print('task2')
if __name__ == '__main__':
for t in tasks:
if t.start:
loop.call_soon(t.task)
else:
loop.call_later(t.interval, t.task)
try:
loop.run_forever()
finally:
loop.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment