Skip to content

Instantly share code, notes, and snippets.

@trAve3113r
Last active October 19, 2018 12:23
Show Gist options
  • Save trAve3113r/c9ac9ea0c783e33f92147fd2b0c1159b to your computer and use it in GitHub Desktop.
Save trAve3113r/c9ac9ea0c783e33f92147fd2b0c1159b to your computer and use it in GitHub Desktop.
# Toy solution :: not production-ready but food for thought
# https://jeffknupp.com/blog/2014/02/11/a-celerylike-python-task-queue-in-55-lines-of-code/
# This article shows how to daemonize a job using supervisord
# https://adamj.eu/tech/2014/08/16/time-to-move-on-from-cron/
# Below is the github repo link
# https://github.com/dbader/schedule
# --------------------------------------- #
# some common functions
# --------------------------------------- #
import Queue
import time
import threading
import schedule
def job():
print("I'm working")
# Threading example
def worker_main():
while 1:
job_func = jobqueue.get()
job_func()
jobqueue.task_done()
jobqueue = Queue.Queue()
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
schedule.every(10).seconds.do(jobqueue.put, job)
worker_thread = threading.Thread(target=worker_main)
worker_thread.start()
while 1:
schedule.run_pending()
time.sleep(1)
# Catch Exceptions
# sub-class schedules :
# https://gist.github.com/mplewis/8483f1c24f2d6259aef6
import functools
def catch_exceptions(job_func, cancel_on_failure=False):
@functools.wraps(job_func)
def wrapper(*args, **kwargs):
try:
return job_func(*args, **kwargs)
except:
import traceback
print(traceback.format_exc())
if cancel_on_failure:
return schedule.CancelJob
return wrapper
@catch_exceptions(cancel_on_failure=True)
def bad_task():
return 1 / 0
schedule.every(5).minutes.do(bad_task)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment