Last active
November 19, 2021 01:31
-
-
Save revmischa/796ac8d99b90c1b3aa10f3e90ef67fd7 to your computer and use it in GitHub Desktop.
Deferred jobs with sqlalchemy/flask/apscheduler
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# sample usage: | |
class MyFlaskApp(Flask): | |
... | |
def start_heartbeat(self): | |
"""Schedule periodic heartbeat updates.""" | |
if hasattr(self, 'heartbeat_sched'): | |
raise Exception("Heartbeat already started") | |
sched = self.get_scheduler() | |
self.heartbeat_sched = sched | |
sched.start() | |
sched.add_job( | |
func="jetbridge.app:scheduler_app_heartbeat", | |
trigger='interval', | |
seconds=self.config.get('HEARTBEAT_INTERVAL_SECONDS', 10), | |
id='heartbeat_{}'.format(self.env_host()), | |
args=dict(env_host=self.env_host()), | |
replace_existing=True, | |
) | |
def startup_job(self): | |
"""Insert a deferred job to log server startup success.""" | |
sched = self.get_scheduler() | |
sched.add_deferred_job( | |
func="jetbridge.app:scheduler_app_startup", | |
id='startup_{}'.format(self.env_host()), | |
args=dict(env_host=self.env_host()), | |
replace_existing=True, | |
) | |
def get_scheduler(self, scheduler_type=None): | |
"""Get a new APScheduler instance configured for our application.""" | |
self._init_scheduler_config() | |
sched = MyJobScheduler(scheduler=scheduler_type, app=self) | |
return sched | |
def _init_scheduler_config(self): | |
"""Load settings for Flask-APScheduler.""" | |
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore | |
from apscheduler.executors.pool import ProcessPoolExecutor | |
config = self.config | |
# some lazy-loaded dynamic defaults | |
dsn = config.get('SQLALCHEMY_DATABASE_URI') | |
self.config['SCHEDULER_JOBSTORES'] = { | |
'default': SQLAlchemyJobStore(url=dsn) | |
} | |
config['SCHEDULER_EXECUTORS'] = { | |
'default': ProcessPoolExecutor(config.get('SCHEDULER_WORKER_PROCS', 1)) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MyJobScheduler(APScheduler): | |
"""Add some methods to Flask-APScheduler.""" | |
def start_paused(self, paused=True): | |
"""Run in paused mode.""" | |
self.scheduler.start(paused=paused) | |
def add_deferred_job(self, **kwargs): | |
"""Insert a job to be run outside of Flask-land. | |
This is the main method for adding a job to be serialized and run on a "clock" worker instance. | |
https://devcenter.heroku.com/articles/clock-processes-python | |
It takes the same format of arguments as Flask-APScheduler's add_job, such as func, trigger, seconds/minutes/hours, id, args. | |
The job is inserted via a new paused scheduler. | |
Make sure to specify an ID that can be used to coalesce unique jobs to ensure it is only run once. | |
Be sure to specify `replace_existing=True` if this ID may be non-unique and you wish to replace an older scheduled job. | |
""" | |
if 'id' not in kwargs: | |
raise ValueError("Please specify an ID that uniquely identifies this job.") | |
self.start_paused(paused=True) | |
self.add_job(**kwargs) | |
# can I shutdown/delete here? | |
# probably unnecessary if using replace_existing=True | |
def add_job_safe(self, scheduler, **kwargs): | |
session = get_db().session | |
job = None | |
jid = kwargs['id'] | |
try: | |
with session.begin_nested(): | |
job = scheduler.get_job(jid) # explodes if not found! :/ | |
if job: | |
raise Exception("job already exists") | |
job = scheduler.add_job(**kwargs) | |
except Exception as e: | |
log.error("Failed to add job {}, error: {}".format(jid, e)) | |
finally: | |
session.commit() | |
return job |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment