Skip to content

Instantly share code, notes, and snippets.

@wiljdaws
Created August 4, 2023 23:41
Show Gist options
  • Save wiljdaws/403cc5c11756ed6872ed2d95fad50914 to your computer and use it in GitHub Desktop.
Save wiljdaws/403cc5c11756ed6872ed2d95fad50914 to your computer and use it in GitHub Desktop.
scheduler to run tasks
import time
import logging
import schedule
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%Y-%m-%d %H:%M:%S")
class AdvancedScheduler:
def __init__(self, start_time, end_time, interval):
self.start_time = start_time
self.end_time = end_time
self.interval = interval
def task(self):
logging.info("Running task.")
# Replace the following line with your actual task logic
print(f"Task - Time: {time.strftime('%Y-%m-%d %H:%M:%S')}")
def run(self):
if time.time() >= self.start_time:
self.schedule_task()
else:
delay = self.start_time - time.time()
logging.info(f"Waiting for {delay} seconds to start the scheduler.")
time.sleep(delay)
self.schedule_task()
def schedule_task(self):
if self.interval == "hourly":
schedule.every().hour.at(":00").do(self.task)
elif self.interval == "quarterly":
total_duration = self.end_time - self.start_time
quarter_duration = total_duration / 4
for i in range(1, 5):
quarter_time = self.start_time + (quarter_duration * i)
if self.start_time < quarter_time <= self.end_time:
schedule.every().day.at(time.strftime("%H:%M", time.localtime(quarter_time))).do(self.task)
else:
logging.error("Invalid interval. Supported intervals: 'hourly', 'quarterly'")
return
while time.time() < self.end_time:
schedule.run_pending()
time.sleep(1)
# Example usage
if __name__ == "__main__":
# Set the start and end time of the shift (in seconds since the epoch)
start_time = int(time.time()) + 5 # Start 5 seconds from now
end_time = start_time + 60 * 60 # End 1 hour from the start time
# Define the interval for running the task ('hourly' or 'quarterly')
interval = "quarterly"
scheduler = AdvancedScheduler(start_time, end_time, interval)
scheduler.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment