Skip to content

Instantly share code, notes, and snippets.

@rgov
Last active January 20, 2020 18:03
Show Gist options
  • Select an option

  • Save rgov/8ef98a65cef2f952cd875048eeefbd2e to your computer and use it in GitHub Desktop.

Select an option

Save rgov/8ef98a65cef2f952cd875048eeefbd2e to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
import collections
import datetime
import heapq
import logging
import threading
class Event \
(collections.namedtuple('Event', 'time interval action args kwargs')):
def __eq__(self, o): return self.time == o.time
def __lt__(self, o): return self.time < o.time
def __le__(self, o): return self.time <= o.time
def __gt__(self, o): return self.time > o.time
def __ge__(self, o): return self.time >= o.time
class Scheduler(threading.Thread):
def __init__(self, logger=None):
super().__init__(daemon=True)
self.logger = logger or logging.getLogger('Scheduler')
self.cv = threading.Condition()
self.qlock = threading.RLock()
self.stop = False
self.queue = []
def shutdown(self):
self.logger.debug('Shutting down')
self.stop = True
with self.cv:
self.cv.notify()
def at(self, when, action, *args, interval=None, **kwargs):
evt = Event(when, interval, action, args, kwargs)
self.logger.debug('Scheduling event %r', evt)
with self.qlock:
heapq.heappush(self.queue, evt)
with self.cv:
self.cv.notify()
return evt
def after(self, delay, action, *args, interval=None, **kwargs):
if not isinstance(delay, datetime.timedelta):
delay = datetime.timedelta(seconds=delay)
when = datetime.datetime.now() + delay
return self.at(when, action, *args, interval=interval, *kwargs)
def every(self, interval, action, *args, now=False, **kwargs):
if not isinstance(interval, datetime.timedelta):
interval = datetime.timedelta(seconds=interval)
return self.after(0 if now else interval,
action, *args, interval=interval, **kwargs)
def cancel(self, evt):
with self.qlock:
self.queue.remove(evt)
heapq.heapify(self.queue)
self.logger.debug(f'Canceling event %r', evt)
with self.cv:
self.cv.notify()
def handle_next_event(self):
# Wait until notified that the queue is updated and non-empty
with self.cv:
while True:
if self.stop:
return
self.qlock.acquire()
if self.queue:
break
self.qlock.release()
self.cv.wait()
# We are now holding our queue lock, and we know the queue is not empty
nextevt = self.queue[0]
# If the event time has passed, pop it from the queue and execute
now = datetime.datetime.now()
diff = (nextevt.time - now).total_seconds()
if diff <= 0:
heapq.heappop(self.queue)
if nextevt.interval is not None:
newevt = nextevt._replace(time=nextevt.time + nextevt.interval)
heapq.heappush(self.queue, newevt)
self.qlock.release()
self.logger.info('Invoking event %r (%.2f ms late)',
nextevt, -1000*diff)
try:
nextevt.action(*nextevt.args, **nextevt.kwargs)
except:
self.logger.error('Caught an exception while handling event %r',
nextevt, exc_info=True)
return
# The event time has not passed, wait until then
self.qlock.release()
with self.cv:
self.logger.info('Waiting %.2f seconds for event', nextevt)
interrupted = self.cv.wait(diff)
# Note: event callback will fire on next iteration, provided it was
# not canceled in the meantime
def run(self):
while not self.stop:
self.handle_next_event()
if __name__ == '__main__':
import time
s = Scheduler()
s.after(6, s.shutdown)
s.after(5, print, 'Finished!')
s.every(1, print, 'Tick')
s.start()
e = s.after(2, print, 'Not this one')
s.cancel(e)
while s.is_alive():
time.sleep(1)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment