Skip to content

Instantly share code, notes, and snippets.

@KunihikoKido
Created August 1, 2013 07:59
Show Gist options
  • Save KunihikoKido/6129346 to your computer and use it in GitHub Desktop.
Save KunihikoKido/6129346 to your computer and use it in GitHub Desktop.
Celery Single Task キャッシュサーバーへロックID保存して制御します。
# -*- coding: utf-8 -*-
import hashlib
from celery.task import Task
from django.core.cache import cache
class SingleTask(Task):
"""
class AddTask(SingleTask):
def run_main(self, x, y):
return x + y
add_task = AddTask()
add_task.delay('addtask1', 2, 4)
"""
LOCK_EXPIRE = 60 * 5
abstract = True
def make_lock_id(self, key_name):
hashed_key = hashlib.md5(key_name).hexdigest()
return '%s-lock-%s' % (self.name, hashed_key)
def acquire_lock(self, key_name):
lock_id = self.make_lock_id(key_name)
acquire_lock = lambda: cache.add(lock_id, 'true', self.LOCK_EXPIRE)
return acquire_lock()
def release_lock(self, key_name):
lock_id = self.make_lock_id(key_name)
release_lock = lambda: cache.delete(lock_id)
return release_lock()
def make_unique_name(self, *args):
return '-'.join([str(arg) for arg in args])
def run(self, unique_name, *args, **kwargs):
logger = self.get_logger()
logger.debug('Run: %s task' % unique_name)
if self.acquire_lock(unique_name):
try:
result = self.run_main(*args, **kwargs)
except Exception, e:
logger.error("%s" % str(e))
finally:
self.release_lock(unique_name)
return result
logger.debug("%s is already running by another worker" % unique_name)
return None
def run_main(self, *args, **kwargs):
""" Overwrite this function """
pass
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment