Last active
January 20, 2020 18:03
-
-
Save rgov/8ef98a65cef2f952cd875048eeefbd2e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import collections | |
| import datetime | |
| import heapq | |
| import logging | |
| import threading | |
| class Event \ | |
| (collections.namedtuple('Event', 'time interval action args kwargs')): | |
| def __eq__(self, o): return self.time == o.time | |
| def __lt__(self, o): return self.time < o.time | |
| def __le__(self, o): return self.time <= o.time | |
| def __gt__(self, o): return self.time > o.time | |
| def __ge__(self, o): return self.time >= o.time | |
| class Scheduler(threading.Thread): | |
| def __init__(self, logger=None): | |
| super().__init__(daemon=True) | |
| self.logger = logger or logging.getLogger('Scheduler') | |
| self.cv = threading.Condition() | |
| self.qlock = threading.RLock() | |
| self.stop = False | |
| self.queue = [] | |
| def shutdown(self): | |
| self.logger.debug('Shutting down') | |
| self.stop = True | |
| with self.cv: | |
| self.cv.notify() | |
| def at(self, when, action, *args, interval=None, **kwargs): | |
| evt = Event(when, interval, action, args, kwargs) | |
| self.logger.debug('Scheduling event %r', evt) | |
| with self.qlock: | |
| heapq.heappush(self.queue, evt) | |
| with self.cv: | |
| self.cv.notify() | |
| return evt | |
| def after(self, delay, action, *args, interval=None, **kwargs): | |
| if not isinstance(delay, datetime.timedelta): | |
| delay = datetime.timedelta(seconds=delay) | |
| when = datetime.datetime.now() + delay | |
| return self.at(when, action, *args, interval=interval, *kwargs) | |
| def every(self, interval, action, *args, now=False, **kwargs): | |
| if not isinstance(interval, datetime.timedelta): | |
| interval = datetime.timedelta(seconds=interval) | |
| return self.after(0 if now else interval, | |
| action, *args, interval=interval, **kwargs) | |
| def cancel(self, evt): | |
| with self.qlock: | |
| self.queue.remove(evt) | |
| heapq.heapify(self.queue) | |
| self.logger.debug(f'Canceling event %r', evt) | |
| with self.cv: | |
| self.cv.notify() | |
| def handle_next_event(self): | |
| # Wait until notified that the queue is updated and non-empty | |
| with self.cv: | |
| while True: | |
| if self.stop: | |
| return | |
| self.qlock.acquire() | |
| if self.queue: | |
| break | |
| self.qlock.release() | |
| self.cv.wait() | |
| # We are now holding our queue lock, and we know the queue is not empty | |
| nextevt = self.queue[0] | |
| # If the event time has passed, pop it from the queue and execute | |
| now = datetime.datetime.now() | |
| diff = (nextevt.time - now).total_seconds() | |
| if diff <= 0: | |
| heapq.heappop(self.queue) | |
| if nextevt.interval is not None: | |
| newevt = nextevt._replace(time=nextevt.time + nextevt.interval) | |
| heapq.heappush(self.queue, newevt) | |
| self.qlock.release() | |
| self.logger.info('Invoking event %r (%.2f ms late)', | |
| nextevt, -1000*diff) | |
| try: | |
| nextevt.action(*nextevt.args, **nextevt.kwargs) | |
| except: | |
| self.logger.error('Caught an exception while handling event %r', | |
| nextevt, exc_info=True) | |
| return | |
| # The event time has not passed, wait until then | |
| self.qlock.release() | |
| with self.cv: | |
| self.logger.info('Waiting %.2f seconds for event', nextevt) | |
| interrupted = self.cv.wait(diff) | |
| # Note: event callback will fire on next iteration, provided it was | |
| # not canceled in the meantime | |
| def run(self): | |
| while not self.stop: | |
| self.handle_next_event() | |
| if __name__ == '__main__': | |
| import time | |
| s = Scheduler() | |
| s.after(6, s.shutdown) | |
| s.after(5, print, 'Finished!') | |
| s.every(1, print, 'Tick') | |
| s.start() | |
| e = s.after(2, print, 'Not this one') | |
| s.cancel(e) | |
| while s.is_alive(): | |
| time.sleep(1) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment