Last active
November 6, 2022 04:45
-
-
Save ibrezm1/dfa1c959640c0f93b37eb7d028a74359 to your computer and use it in GitHub Desktop.
Flask demo for ap shedular cron type , Delete the sqlite file first
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Demonstrating APScheduler feature for small Flask App. """ | |
from apscheduler.schedulers.background import BackgroundScheduler | |
from flask import Flask | |
from datetime import datetime | |
from pytz import timezone | |
tz = timezone('US/Eastern') | |
print(datetime.now(tz) ) | |
# https://gist.github.com/ivanleoncz/21293b00d0ea54db8ee3b57fb1170ddf | |
#fmt = '%Y-%m-%d %H:%M:%S %Z%z' | |
fmt = '%Y-%m-%d %H:%M:%S' | |
def sensor(): | |
""" Function for test purposes. """ | |
print("Scheduler is alive!") | |
from datetime import datetime | |
def job(job_type): | |
print(f'Alarm! {datetime.now(tz).strftime(fmt)} This alarm was scheduled at {job_type}') | |
sched = BackgroundScheduler({ | |
'apscheduler.jobstores.default': { | |
'type': 'sqlalchemy', | |
'url': 'sqlite:///jobs.sqlite' | |
}, | |
'apscheduler.executors.default': { | |
'class': 'apscheduler.executors.pool:ThreadPoolExecutor', | |
'max_workers': '20' | |
}, | |
'apscheduler.executors.processpool': { | |
'type': 'processpool', | |
'max_workers': '5' | |
}, | |
'apscheduler.job_defaults.coalesce': 'false', | |
'apscheduler.job_defaults.max_instances': '3', | |
'apscheduler.timezone': 'US/Eastern', | |
}) | |
sched.add_job(sensor,'interval',seconds=60) | |
sched.add_job(job, 'cron', hour= '*', minute= '*', | |
args= ['Every hour & Min']) | |
cron_time_dict = {'year': '*', 'month': 11, 'day': 6, 'week': '*', | |
'day_of_week': '*', 'hour': '*', 'minute': '*', 'second': '*/5'} | |
sched.add_job(job, 'cron', **cron_time_dict, | |
args= ['all possible & Min']) | |
sched.add_job(job, 'cron', day_of_week='mon-fri', hour=17 , args=['monday2friday']) | |
# Runs every 5 minutes at 17 o'clock a day | |
sched.add_job(job, 'cron', hour= 17, minute= '*/5', | |
args= ['job 2']) | |
# Schedules job to be run on the third occurrence of Friday | |
# on the months June, July, August, November and December | |
# at 00:00, 01:00, 02:00, 03:00 and 04:00 | |
sched.add_job(job, 'cron', month= '6-8,11-12', day= '3rd fri', | |
hour= '0-4', args= ['job 4']) | |
# Runs from Monday to Friday at 6:30AM until 2022-06-30 00:00:00 | |
sched.add_job(job, 'cron', day_of_week= 'mon-fri', hour= 6, | |
minute= 30, end_date= '2022-06-30', | |
args= ['job 5']) | |
# job every 2 hours, during the interval of the 13th and 24th hour. | |
sched.add_job(job, 'cron', hour = "12-23/2", args= ['job 7']) | |
# When scheduling a large number of tasks to occur at a certain time, | |
# this can cause a sudden spike in activity in a split second. | |
# To balance these jobs out a bit, and add a random component to them | |
# we use the jitter option. | |
# https://coderslegacy.com/python/apscheduler-cron-trigger/ | |
sched.add_job(job, 'cron', hour = '*', jitter = 60, args= ['job 8']) | |
sched.start() | |
app = Flask(__name__) | |
@app.route("/home") | |
def home(): | |
""" Function for test purposes. """ | |
return "Welcome Home :) !" | |
if __name__ == "__main__": | |
app.run(host='0.0.0.0', port=8111) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment