Created
August 4, 2023 23:41
-
-
Save wiljdaws/403cc5c11756ed6872ed2d95fad50914 to your computer and use it in GitHub Desktop.
scheduler to run tasks
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import logging | |
import schedule | |
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S") | |
class AdvancedScheduler: | |
def __init__(self, start_time, end_time, interval): | |
self.start_time = start_time | |
self.end_time = end_time | |
self.interval = interval | |
def task(self): | |
logging.info("Running task.") | |
# Replace the following line with your actual task logic | |
print(f"Task - Time: {time.strftime('%Y-%m-%d %H:%M:%S')}") | |
def run(self): | |
if time.time() >= self.start_time: | |
self.schedule_task() | |
else: | |
delay = self.start_time - time.time() | |
logging.info(f"Waiting for {delay} seconds to start the scheduler.") | |
time.sleep(delay) | |
self.schedule_task() | |
def schedule_task(self): | |
if self.interval == "hourly": | |
schedule.every().hour.at(":00").do(self.task) | |
elif self.interval == "quarterly": | |
total_duration = self.end_time - self.start_time | |
quarter_duration = total_duration / 4 | |
for i in range(1, 5): | |
quarter_time = self.start_time + (quarter_duration * i) | |
if self.start_time < quarter_time <= self.end_time: | |
schedule.every().day.at(time.strftime("%H:%M", time.localtime(quarter_time))).do(self.task) | |
else: | |
logging.error("Invalid interval. Supported intervals: 'hourly', 'quarterly'") | |
return | |
while time.time() < self.end_time: | |
schedule.run_pending() | |
time.sleep(1) | |
# Example usage | |
if __name__ == "__main__": | |
# Set the start and end time of the shift (in seconds since the epoch) | |
start_time = int(time.time()) + 5 # Start 5 seconds from now | |
end_time = start_time + 60 * 60 # End 1 hour from the start time | |
# Define the interval for running the task ('hourly' or 'quarterly') | |
interval = "quarterly" | |
scheduler = AdvancedScheduler(start_time, end_time, interval) | |
scheduler.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment