Created
September 28, 2018 03:55
-
-
Save dtranhuusm/7e4ce23e721bb38a3e120de33bb75a18 to your computer and use it in GitHub Desktop.
Sample code using decorator for caching celery+redis result
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" Module to test remote execution using Celery queue with Redis backend. | |
Setup: | |
* a redis database must be running and accessible. | |
* the environment variable redis_host mut be defined with value <ip>:<port> | |
* celery and redis packages must be installed | |
* celery worker(s) must be running this script: | |
```celery worker -A tests.test_remote -l info --autoscale=5,1``` | |
Execute test: | |
pytest tests/test_remote.py | |
License: GPL v3 | |
""" | |
import os | |
import pandas as pd | |
redis_host = os.getenv('redis_host', '') | |
assert(redis_host) | |
from celery import Celery | |
from celery.result import AsyncResult | |
import redis | |
import hashlib | |
ri = redis.Redis(redis_host.split(':')[0]) | |
host = 'redis://{0}/0'.format(redis_host) | |
print(host) | |
cache_expires = 3600 | |
cache_prefix = 'remote_call' | |
app = Celery('test_remote', backend=host, broker=host) | |
app.conf.update(accept_content=['pickle'], | |
task_serializer='pickle', | |
result_serializer='pickle', | |
result_expires=cache_expires, | |
result_cache_max=0) | |
# somehow was not able to get celery caching to work with result_cache_max=0 | |
def flush_cache(): | |
ri.delete(*ri.keys(cache_prefix+'*')) | |
@app.task | |
def remote_call(fn, *args, **kwargs): | |
function = globals()["_"+fn] | |
return function(*args, **kwargs) | |
def remote_enable(cache=True): | |
def decorator(function): | |
def wrapper(*args, **kwargs): | |
if redis_host: | |
if cache: | |
m = hashlib.md5() | |
m.update((function.__name__+str(args)+str(kwargs) | |
).encode('utf-8')) | |
hc = cache_prefix+m.hexdigest() | |
id = ri.get(hc) | |
if id: | |
r = AsyncResult(id) | |
else: | |
r = remote_call.delay(function.__name__, *args, **kwargs) | |
ri.setex(hc, r.id, cache_expires) | |
else: | |
r = remote_call.delay(function.__name__, *args, **kwargs) | |
return r.get() | |
else: | |
return function(*args, **kwargs) | |
return wrapper | |
return decorator | |
@remote_enable(True) | |
def execute_select(db, query): | |
return _execute_select(db, query) | |
def _execute_select(db, query): | |
print('Executing %s with parameters:' % (__name__)) | |
return pd.DataFrame({__name__: [db, query]}) | |
@remote_enable(True) | |
def execute_empty(): | |
return _test_empty() | |
def _execute_empty(): | |
return pd.DataFrame() | |
@remote_enable(False) | |
def execute_update(db, query): | |
return _execute_update(db, query) | |
def _execute_update(db, query): | |
print('Executing %s with parameters:' % (__name__)) | |
for arg in [db, query]: | |
print(arg) | |
def test_run(): | |
# validate that there is nothing in the cache | |
len(ri.keys(cache_prefix+'*')) == 0 | |
df = execute_select("test","select * from test") | |
# validate returned dataframe | |
df.iloc[0,0] == "test" | |
df.iloc[1,0] == "select * from test" | |
# validate the execution created one entry in the cache | |
len(ri.keys(cache_prefix+'*')) == 1 | |
df1 = execute_select("test","select * from test") | |
# validate retrieval from cache | |
df1.iloc[0,0] == "test" | |
df1.iloc[1,0] == "select * from test" | |
# validate there is no new entry added to the cache | |
len(ri.keys(cache_prefix+'*')) == 1 | |
df2 = execute_empty() | |
# validate there is no issue with empty dataframe | |
df2.empty | |
# validate there is a new entry in the cache | |
len(ri.keys(cache_prefix+'*')) == 2 | |
execute_update("test","insert into test values") | |
# validate there is no new entry as execute_update is specified not to cache | |
len(ri.keys(cache_prefix+'*')) == 2 | |
flush_cache() | |
# validate the cache is flushed | |
len(ri.keys(cache_prefix+'*')) == 0 | |
if __name__=="__main__": | |
test_run() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment