Created
November 4, 2016 22:38
-
-
Save wolever/3cf2305613052f3810a271e09d42e35c to your computer and use it in GitHub Desktop.
A task debouncer for Celery.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import redis | |
def get_redis_connection(): | |
return redis.connect() | |
class TaskDebouncer(object): | |
""" A simple Celery task debouncer. | |
Usage:: | |
def debounce_process_corpus(corpus): | |
# Only one task with ``key`` will be allowed to execute at a | |
# time. For example, if the task was resizing an image, the key | |
# might be the image's URL. | |
key = "process_corpus:%s" %(corpus.id, ) | |
TaskDebouncer.delay( | |
key, my_taks, args=[corpus.id], countdown=0, | |
) | |
@task(bind=True) | |
def process_corpus(self, corpus_id, debounce_key=None): | |
debounce = TaskDebouncer(debounce_key, keepalive=30) | |
corpus = Corpus.load(corpus_id) | |
try: | |
for item in corpus: | |
item.process() | |
# If ``debounce.keepalive()`` isn't called every | |
# ``keepalive`` interval (the ``keepalive=30`` in the | |
# call to ``TaskDebouncer(...)``) the task will be | |
# considered dead and another one will be allowed to | |
# start. | |
debounce.keepalive() | |
finally: | |
# ``finalize()`` will mark the task as complete and allow | |
# subsequent tasks to execute. If it returns true, there | |
# was another attempt to start a task with the same key | |
# while this task was running. Depending on your business | |
# logic, this might indicate that the task should be | |
# retried. | |
needs_retry = debounce.finalize() | |
if needs_retry: | |
raise self.retry(max_retries=None) | |
""" | |
def __init__(self, key, keepalive=60): | |
if key: | |
self.key = key.partition("!")[0] | |
self.run_key = key | |
else: | |
self.key = None | |
self.run_key = None | |
self._keepalive = keepalive | |
self.cxn = get_redis_connection() | |
self.init() | |
self.keepalive() | |
@classmethod | |
def delay(cls, key, task, args=None, kwargs=None, countdown=30): | |
cxn = get_redis_connection() | |
now = int(time.time()) | |
first = cxn.set(key, now, nx=True, ex=countdown + 10) | |
if not first: | |
now = cxn.get(key) | |
run_key = "%s!%s" %(key, now) | |
if first: | |
kwargs = dict(kwargs or {}) | |
kwargs["debounce_key"] = run_key | |
task.apply_async(args=args, kwargs=kwargs, countdown=countdown) | |
return (first, run_key) | |
def init(self): | |
self.initial = self.key and self.cxn.get(self.key) | |
def keepalive(self, expire=None): | |
if self.key is None: | |
return | |
expire = expire if expire is not None else self._keepalive | |
self.cxn.expire(self.key, expire) | |
def is_out_of_date(self): | |
if self.key is None: | |
return False | |
return self.cxn.get(self.key) != self.initial | |
def finalize(self): | |
if self.key is None: | |
return False | |
with self.cxn.pipeline() as pipe: | |
while True: | |
try: | |
pipe.watch(self.key) | |
if pipe.get(self.key) != self.initial: | |
return True | |
pipe.multi() | |
pipe.delete(self.key) | |
pipe.execute() | |
break | |
except redis.WatchError: | |
continue | |
return False |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment