Created
March 26, 2020 13:28
-
-
Save kinnou02/5a2b4977fb8eef9ebfd857c22880ba66 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def lock_release(lock, logger): | |
token = None | |
if hasattr(lock, 'local') and hasattr(lock.local, 'token'): | |
# we store the token of the lock to be able to restore it later in case of error | |
token = lock.local.token | |
try: | |
lock.release() | |
except: | |
if token: | |
# release failed and token has been invalidated, any retry will fail, we restore the token | |
# so in case of a connection error we will reconnect and release the lock | |
lock.local.token = token | |
logger.exception("exception when trying to release lock, will retry") | |
raise | |
class Lock(object): | |
""" | |
usage: | |
@celery.task(bind=True) | |
@Lock(timeout=30 * 60) | |
def mytask(self): | |
pass | |
""" | |
def __init__(self, timeout): | |
self.timeout = timeout | |
def __call__(self, func): | |
@wraps(func) | |
def wrapper(*args, **kwargs): | |
job_id = get_named_arg('job_id', func, args, kwargs) | |
logging.debug('args: %s -- kwargs: %s', args, kwargs) | |
job = models.Job.query.get(job_id) | |
logger = get_instance_logger(job.instance, task_id=job_id) | |
task = args[func.func_code.co_varnames.index('self')] | |
try: | |
lock = redis.lock('tyr.lock|' + job.instance.name, timeout=self.timeout) | |
locked = lock.acquire(blocking=False) | |
except ConnectionError: | |
logging.exception('Exception with redis while locking. Retrying in 10sec') | |
task.retry(countdown=10, max_retries=10) | |
if not locked: | |
countdown = 300 | |
logger.info('lock on %s retry %s in %s sec', job.instance.name, func.__name__, countdown) | |
task.retry(countdown=countdown, max_retries=10) | |
else: | |
try: | |
logger.debug('lock acquired on %s for %s', job.instance.name, func.__name__) | |
return func(*args, **kwargs) | |
finally: | |
logger.debug('release lock on %s for %s', job.instance.name, func.__name__) | |
# sometimes we are disconnected from redis when we want to release the lock, | |
# so we retry only the release | |
try: | |
retrying.Retrying(stop_max_attempt_number=5, wait_fixed=1000).call( | |
lock_release, lock, logger | |
) | |
except ValueError: # LockError(ValueError) since redis 3.0 | |
logger.exception( | |
"impossible to release lock: continue but following task may be locked :(" | |
) | |
return wrapper |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment