Created
August 26, 2013 13:22
-
-
Save akaihola/6341351 to your computer and use it in GitHub Desktop.
Recurring dynamic Celery task manager
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
class InvalidOperation(Exception): | |
pass | |
class RepeatingCacheManager(object): | |
"""Recurring dynamic Celery task manager | |
This class can be used in Celery tasks which should refresh a cache key | |
repeatedly with a given interval. It uses locking to make sure that the | |
same cache key isn't already being updated. This way we prevent another | |
HTTP request from triggering simultaneous tasks. | |
Redis is used for locking, and a Django cache backend for caching the | |
actual value. | |
Usage: | |
In your Celery task, instantiate the manager with your cache key and the | |
desired interval in seconds. You may also specify an offset in seconds to | |
have e.g. a task which occurs 15 seconds after every 5-minute mark, and an | |
optional cache backend if you don't want to use the default backend:: | |
@task | |
def mytask(<arguments>): | |
cacher = RepeatingCacheManager(<cache key>, <interval>, <offset>, <cache>) | |
Then check the value of the :attr:`need_refresh` attribute, and if it's | |
``False``, do nothing, since it means there's another task already working | |
on the cache key. | |
if not cacher.need_refresh: | |
return | |
If :attr:`need_refresh` is ``True``, obtain the value you need to cache and | |
call the :meth:`save_and_unlock` method to store it in the cache and | |
release the lock:: | |
value = None | |
try: | |
value = <obtain value> | |
finally: | |
cacher.save_and_unlock(value) | |
Finally, if the value was successfully obtained, re-schedule your task at | |
the next interval using the :attr:`eta` attribute:: | |
mytask.apply_async(args=[<arguments>], eta=cacher.eta) | |
A complete example:: | |
@task | |
def mytask(cache_key, link): | |
cacher = RepeatingCacheManager(cache_key, interval=3600, offset=60) | |
if not cacher.need_refresh: | |
return | |
value = None | |
try: | |
value = get_clicks_last_hour(link) | |
finally: | |
cacher.save_and_unlock(value) | |
mytask.apply_async(args=[cache_key, link], eta=cacher.eta) | |
def statistics_view(request, link): | |
cache_key = 'clicks-last-hour:{0}'.format(link) | |
value = cache.get(cache_key) | |
if not value: | |
value = '(reload to see number of clicks last hour)' | |
mytask.delay(cache_key, link) | |
return render_to_response('clicks.html', {'clicks': value}) | |
""" | |
def __init__(self, | |
cache_key, interval=600, offset=0, timeout=None, cache=None): | |
self.cache_key = cache_key | |
self.interval = interval | |
self.offset = offset | |
self.redis = get_redis() | |
if cache: | |
self.cache = cache | |
else: | |
# use settings.CACHES['default'] if cache not specified | |
from django.core.cache import cache as default_cache | |
self.cache = default_cache | |
self._eta = None | |
self.cache_lock_key = '{0}:lock'.format(cache_key) | |
self.epoch = int(time.time()) | |
# Increase the lock by current epoch. | |
lock_value = self.redis.incr(self.cache_lock_key, self.epoch) | |
if lock_value < self.epoch: | |
raise ValueError( | |
'RepeatingCacheManager value for {0} in Redis ' | |
'was {1} before incrementing!' | |
.format(self.cache_lock_key, lock_value - self.epoch)) | |
if lock_value == self.epoch: | |
# no other task running, we need to refresh cache | |
self.need_refresh = True | |
return | |
elapsed_since_other_task = self.epoch - (lock_value - self.epoch) | |
if elapsed_since_other_task < (timeout or self.interval // 2): | |
# another instance of this task with the same arguments was started | |
# and timeout hasn't yet passed -> cancel | |
self.redis.decr(self.cache_lock_key, self.epoch) | |
self.need_refresh = False | |
return | |
# Another instance of this task has timed out. | |
# Reset lock and refresh cache. | |
self.redis.set(self.cache_lock_key, self.epoch) | |
self.need_refresh = True | |
def save_and_unlock(self, result): | |
# sanity check | |
if not self.need_refresh: | |
raise InvalidOperation( | |
"RepeatingCacheManager can't save value to cache since there " | |
'is another similar task already running.') | |
# cache the Marvin data and release the lock | |
self.cache.set(self.cache_key, result, self.interval) | |
self.redis.decr(self.cache_lock_key, self.epoch) | |
# re-schedule this at the next interval plus offset, | |
# by default 10 seconds after the next 10-minute mark | |
now = datetime.now() | |
midnight = datetime(now.year, now.month, now.day) | |
seconds_since_midnight = (now - midnight).seconds | |
last_interval = ( | |
self.interval * (seconds_since_midnight // self.interval)) | |
next_interval = timedelta( | |
seconds=last_interval + self.interval + self.offset) | |
self._eta = midnight + next_interval | |
@property | |
def eta(self): | |
# sanity check: | |
if not self._eta: | |
raise InvalidOperation('RepeatingCacheManager.save_and_unlock() ' | |
'not called, no ETA yet available.') | |
return self._eta |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from contextlib import nested | |
from datetime import datetime, timedelta | |
from django.core.cache import cache as default_cache | |
from django.test import TestCase | |
from mock import Mock, patch | |
# see https://gist.github.com/3312399 | |
from cache_switch import cache_switch | |
import repeating_cache_manager as rcm | |
def patch_time(return_value): | |
datetime_mock = Mock( | |
wraps=datetime, | |
now=Mock(return_value=( | |
datetime(2012, 9, 10) + timedelta(seconds=return_value)))) | |
return nested( | |
patch.object(rcm.time, 'time', Mock(return_value=return_value)), | |
patch.object(rcm, 'datetime', datetime_mock)) | |
@cache_switch('locmem://') | |
class RepeatingCacheManager_Tests(TestCase): | |
# pylint: disable=E1101 | |
# Instance of <class> has no <member> | |
__test__ = settings.TEST_MODE == 'integration' | |
def setUp(self): | |
rcm.get_redis().flushall() | |
def assertLockValue(self, value): | |
self.assertEqual(str(value), | |
rcm.get_redis().get(self.cacher.cache_lock_key)) | |
def test_first_call(self): | |
"""Cache refresh is needed on first call""" | |
with patch_time(100): | |
self.cacher = rcm.RepeatingCacheManager('key', | |
interval=10, | |
offset=0, | |
timeout=2) | |
self.assertTrue(self.cacher.need_refresh) | |
self.assertLockValue(100) | |
self.cacher.save_and_unlock('value') | |
self.assertEqual('value', default_cache.get('key')) | |
self.assertLockValue(0) | |
self.assertEqual(datetime(2012, 9, 10, 0, 1, 50), self.cacher.eta) | |
def test_second_call(self): | |
"""Cache refresh is needed on second call if first call completed""" | |
with patch_time(100): | |
self.cacher = rcm.RepeatingCacheManager('key', | |
interval=10, | |
offset=0, | |
timeout=2) | |
self.assertTrue(self.cacher.need_refresh) | |
self.cacher.save_and_unlock('value') | |
with patch_time(110): | |
self.cacher = rcm.RepeatingCacheManager('key', | |
interval=10, | |
offset=0, | |
timeout=2) | |
self.assertTrue(self.cacher.need_refresh) | |
self.assertLockValue(110) | |
self.cacher.save_and_unlock('new value') | |
self.assertEqual('new value', default_cache.get('key')) | |
self.assertLockValue(0) | |
self.assertEqual(datetime(2012, 9, 10, 0, 2), self.cacher.eta) | |
def test_second_call_when_first_running(self): | |
"""Cache refresh not needed on second call if first call working""" | |
with patch_time(100): | |
self.cacher = rcm.RepeatingCacheManager('key', | |
interval=10, | |
offset=0, | |
timeout=2) | |
self.assertTrue(self.cacher.need_refresh) | |
self.assertLockValue(100) | |
with patch_time(101): | |
self.cacher2 = rcm.RepeatingCacheManager('key', | |
interval=10, | |
offset=0, | |
timeout=2) | |
self.assertLockValue(100) | |
self.assertFalse(self.cacher2.need_refresh) | |
self.cacher.save_and_unlock('new value') | |
self.assertLockValue(0) | |
self.assertEqual(datetime(2012, 9, 10, 0, 1, 50), self.cacher.eta) | |
def test_second_call_when_first_timeout(self): | |
"""Cache refresh needed on second call if first call timeout""" | |
with patch_time(100): | |
self.cacher = rcm.RepeatingCacheManager('key', | |
interval=10, | |
offset=0, | |
timeout=2) | |
self.assertTrue(self.cacher.need_refresh) | |
with patch_time(103): | |
self.cacher2 = rcm.RepeatingCacheManager('key', | |
interval=10, | |
offset=0, | |
timeout=2) | |
self.assertTrue(self.cacher2.need_refresh) | |
self.assertLockValue(103) | |
self.cacher2.save_and_unlock('new value') | |
self.assertLockValue(0) | |
self.assertRaises(rcm.InvalidOperation, lambda: self.cacher.eta) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment