Skip to content

Instantly share code, notes, and snippets.

@TheWaWaR
Last active May 16, 2021 09:14
Show Gist options
  • Save TheWaWaR/1a578fadf6a9263d6d2a551daad0bd4b to your computer and use it in GitHub Desktop.
Save TheWaWaR/1a578fadf6a9263d6d2a551daad0bd4b to your computer and use it in GitHub Desktop.
<Advanced Python Scheduler> date trigger example
# coding: utf-8
import os
import time
from datetime import datetime, timedelta
from pytz import utc, timezone
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.triggers.date import DateTrigger
def the_job(oid):
print('Run job: object.id={}, datetime={}'.format(oid, datetime.now()))
def main():
sqlite_file = 'apscheduler-jobs.sqlite'
os.system('rm -f {}'.format(sqlite_file))
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///{}'.format(sqlite_file)),
}
executors = {
'default': ThreadPoolExecutor(20),
}
job_defaults = {
'coalesce': False,
'max_instances': 1
}
scheduler = BackgroundScheduler(
jobstores=jobstores, executors=executors,
job_defaults=job_defaults, timezone=utc
)
scheduler.start()
def add_job(oid):
trigger = DateTrigger(
run_date=datetime.now() + timedelta(seconds=oid),
timezone=timezone('Asia/Shanghai')
)
scheduler.add_job(the_job, args=[oid],
trigger=trigger,
id='task-{}'.format(oid),
name='Task({})'.format(oid),
coalesce=True, max_instances=1)
add_job(3)
add_job(6)
add_job(9)
try:
print('Press <Ctrl + C> to stop! datetime={}'.format(datetime.now()))
time.sleep(1000000000)
except KeyboardInterrupt:
print('Stopping...')
scheduler.shutdown()
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment