Skip to content

Instantly share code, notes, and snippets.

@aaronpolhamus
Created August 9, 2020 02:06
Show Gist options
  • Save aaronpolhamus/cb305a3350f943215d00b66c85f576ea to your computer and use it in GitHub Desktop.
Save aaronpolhamus/cb305a3350f943215d00b66c85f576ea to your computer and use it in GitHub Desktop.
task locking with redis + celery
"""Task locking with redis in celery is hard, and good examples are tough to come by. This is the approach that's
worked for me, based on great work that other folks have posted:
* https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/
* http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html
* https://redis.io/topics/distlock
This isn't polished,but hopefully it's useful. To verify it in our local test env we register the following test task in
our definitions file:
*** definitions.py ***
@celery.task(name="async_test_task_lock")
@task_lock(main_key="async_test_task_lock", timeout=UPDATE_GAME_DATA_TIMEOUT)
def async_test_task_lock(game_id):
print(f"processing game_id {game_id}")
time.sleep(TASK_LOCK_TEST_SLEEP)
*** test_celery_tasks.py ***
and here's the test, using unittest.TestCase
from backend.tasks.definitions import async_test_task_lock, TASK_LOCK_TEST_SLEEP
from backend.tasks.redis_handlers import rds, TASK_LOCK_MSG
class TestTaskLocking(TestCase):
def test_task_locking(self):
rds.flushall()
res1 = async_test_task_lock.delay(3)
res2 = async_test_task_lock.delay(5)
self.assertFalse(res1.ready())
self.assertFalse(res2.ready())
res3 = async_test_task_lock.delay(5)
res4 = async_test_task_lock.delay(5)
self.assertEqual(res3.get(), TASK_LOCK_MSG)
self.assertEqual(res4.get(), TASK_LOCK_MSG)
time.sleep(TASK_LOCK_TEST_SLEEP)
res5 = async_test_task_lock.delay(3)
self.assertFalse(res5.ready())
"""
import base64
from contextlib import contextmanager
import json
import pickle as pkl
import uuid
from backend.config import Config
from redis import StrictRedis
from redis_cache import RedisCache
from redlock import Redlock
rds = StrictRedis(Config.REDIS_HOST, decode_responses=True, charset="utf-8")
rds_cache = StrictRedis(Config.REDIS_HOST, decode_responses=False, charset="utf-8")
redis_cache = RedisCache(redis_client=rds_cache, prefix="rc", serializer=pkl.dumps, deserializer=pkl.loads)
dlm = Redlock([{"host": Config.REDIS_HOST}])
TASK_LOCK_MSG = "Task execution skipped -- another task already has the lock"
DEFAULT_ASSET_EXPIRATION = 8 * 24 * 60 * 60 # by default keep cached values around for 8 days
DEFAULT_CACHE_EXPIRATION = 1 * 24 * 60 * 60 # we can keep cached values around for a shorter period of time
REMOVE_ONLY_IF_OWNER_SCRIPT = """
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
"""
@contextmanager
def redis_lock(lock_name, expires=60):
# https://breadcrumbscollector.tech/what-is-celery-beat-and-how-to-use-it-part-2-patterns-and-caveats/
random_value = str(uuid.uuid4())
lock_acquired = bool(
rds.set(lock_name, random_value, ex=expires, nx=True)
)
print(f'Lock acquired? {lock_name} for {expires} - {lock_acquired}')
yield lock_acquired
if lock_acquired:
# if lock was acquired, then try to release it BUT ONLY if we are the owner
# (i.e. value inside is identical to what we put there originally)
rds.eval(REMOVE_ONLY_IF_OWNER_SCRIPT, 1, lock_name, random_value)
print(f'Lock {lock_name} released!')
def argument_signature(*args, **kwargs):
arg_list = [str(x) for x in args]
kwarg_list = [f"{str(k)}:{str(v)}" for k, v in kwargs.items()]
return base64.b64encode(f"{'_'.join(arg_list)}-{'_'.join(kwarg_list)}".encode()).decode()
def task_lock(func=None, main_key="", timeout=None):
def _dec(run_func):
def _caller(*args, **kwargs):
with redis_lock(f"{main_key}_{argument_signature(*args, **kwargs)}", timeout) as acquired:
if not acquired:
return TASK_LOCK_MSG
return run_func(*args, **kwargs)
return _caller
return _dec(func) if func is not None else _dec
def unpack_redis_json(key: str):
result = rds.get(key)
if result is not None:
return json.loads(result)
@al-the-x
Copy link

Found this gist looking for the source of another example that I found in a production codebase: https://gist.github.com/Skyross/2f4c95f5df2446b71f74f4f9d9771125

Thanks for your ideas on this!

We're using a custom celery.Task subtask like the above example, but I think that using Redis.lock would simplify the implementation significantly. Have you encountered any difficulty with it in production?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment