Created
June 20, 2019 00:03
-
-
Save planetceres/89c8e6a6f52656463b5ed38815d1ede3 to your computer and use it in GitHub Desktop.
Threaded Cron Execution Timer Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# reference: https://stackoverflow.com/a/13151299 | |
''' | |
Usage: | |
def hello(name): | |
print "Hello %s!" % name | |
print "starting..." | |
rt = RateScheduler(1, hello, "World") # it auto-starts, no need of rt.start() | |
try: | |
sleep(5) # your long-running job goes here... | |
finally: | |
rt.stop() # better in a try/finally block to make sure the program ends! | |
''' | |
import threading | |
import time | |
from time import sleep | |
class RateScheduler(object): | |
def __init__(self, interval, function, *args, **kwargs): | |
self._timer = None | |
self.interval = interval | |
self.function = function | |
self.args = args | |
self.kwargs = kwargs | |
self.is_running = False | |
self.start() | |
def _run(self): | |
self.is_running = False | |
self.start() | |
self.function(*self.args, **self.kwargs) | |
def start(self): | |
if not self.is_running: | |
self._timer = threading.Timer(self.interval, self._run) | |
self._timer.start() | |
self.is_running = True | |
def stop(self): | |
self._timer.cancel() | |
self.is_running = False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment