Last active
August 29, 2015 14:08
-
-
Save lost-theory/2e94ae20643ac32a6773 to your computer and use it in GitHub Desktop.
subclassing rq_scheduler.Scheduler for https://github.com/ui/rq-scheduler/pull/43
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from datetime import datetime | |
import logging | |
logging.basicConfig(level=logging.DEBUG) | |
from redis import Redis | |
from rq_scheduler import Scheduler | |
from rq_scheduler.utils import to_unix | |
class CustomRQScheduler(Scheduler): | |
''' | |
Same as the core rq_scheduler.Scheduler, but with the enqueue_job method | |
overridden to enqueue new jobs for each run instead of re-using the same job | |
ID. This fixes race conditions around overlapping jobs and allows you to manage | |
expiration of results by yourself. | |
See GitHub issue for details: https://github.com/ui/rq-scheduler/pull/43 | |
This works with rq-scheduler==0.5.0. Be careful about running with a version | |
other than that. | |
''' | |
def enqueue_job(self, job): | |
""" | |
Move a scheduled job to a queue. In addition, it also does puts the job | |
back into the scheduler if needed. | |
""" | |
self.log.debug('Pushing {0} to {1}'.format(job.id, job.origin)) | |
interval = job.meta.get('interval', None) | |
repeat = job.meta.get('repeat', None) | |
# If job is a repeated job, decrement counter | |
if repeat: | |
job.meta['repeat'] = int(repeat) - 1 | |
job.enqueued_at = datetime.utcnow() | |
job.save() | |
queue = self.get_queue_for_job(job) | |
queue.enqueue_call(job.func, job.args, job.kwargs, job.timeout, job.result_ttl) | |
self.connection.zrem(self.scheduled_jobs_key, job.id) | |
if interval: | |
# If this is a repeat job and counter has reached 0, don't repeat | |
if repeat is not None: | |
if job.meta['repeat'] == 0: | |
return | |
self.connection._zadd(self.scheduled_jobs_key, | |
to_unix(datetime.utcnow()) + int(interval), | |
job.id) | |
if __name__ == "__main__": | |
s = CustomRQScheduler(connection=Redis(), interval=5) | |
s.run() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment