Skip to content

Instantly share code, notes, and snippets.

@dtranhuusm
Created September 28, 2018 03:55
Show Gist options
  • Save dtranhuusm/7e4ce23e721bb38a3e120de33bb75a18 to your computer and use it in GitHub Desktop.
Save dtranhuusm/7e4ce23e721bb38a3e120de33bb75a18 to your computer and use it in GitHub Desktop.
Sample code using decorator for caching celery+redis result
""" Module to test remote execution using Celery queue with Redis backend.
Setup:
* a redis database must be running and accessible.
* the environment variable redis_host mut be defined with value <ip>:<port>
* celery and redis packages must be installed
* celery worker(s) must be running this script:
```celery worker -A tests.test_remote -l info --autoscale=5,1```
Execute test:
pytest tests/test_remote.py
License: GPL v3
"""
import os
import pandas as pd
redis_host = os.getenv('redis_host', '')
assert(redis_host)
from celery import Celery
from celery.result import AsyncResult
import redis
import hashlib
ri = redis.Redis(redis_host.split(':')[0])
host = 'redis://{0}/0'.format(redis_host)
print(host)
cache_expires = 3600
cache_prefix = 'remote_call'
app = Celery('test_remote', backend=host, broker=host)
app.conf.update(accept_content=['pickle'],
task_serializer='pickle',
result_serializer='pickle',
result_expires=cache_expires,
result_cache_max=0)
# somehow was not able to get celery caching to work with result_cache_max=0
def flush_cache():
ri.delete(*ri.keys(cache_prefix+'*'))
@app.task
def remote_call(fn, *args, **kwargs):
function = globals()["_"+fn]
return function(*args, **kwargs)
def remote_enable(cache=True):
def decorator(function):
def wrapper(*args, **kwargs):
if redis_host:
if cache:
m = hashlib.md5()
m.update((function.__name__+str(args)+str(kwargs)
).encode('utf-8'))
hc = cache_prefix+m.hexdigest()
id = ri.get(hc)
if id:
r = AsyncResult(id)
else:
r = remote_call.delay(function.__name__, *args, **kwargs)
ri.setex(hc, r.id, cache_expires)
else:
r = remote_call.delay(function.__name__, *args, **kwargs)
return r.get()
else:
return function(*args, **kwargs)
return wrapper
return decorator
@remote_enable(True)
def execute_select(db, query):
return _execute_select(db, query)
def _execute_select(db, query):
print('Executing %s with parameters:' % (__name__))
return pd.DataFrame({__name__: [db, query]})
@remote_enable(True)
def execute_empty():
return _test_empty()
def _execute_empty():
return pd.DataFrame()
@remote_enable(False)
def execute_update(db, query):
return _execute_update(db, query)
def _execute_update(db, query):
print('Executing %s with parameters:' % (__name__))
for arg in [db, query]:
print(arg)
def test_run():
# validate that there is nothing in the cache
len(ri.keys(cache_prefix+'*')) == 0
df = execute_select("test","select * from test")
# validate returned dataframe
df.iloc[0,0] == "test"
df.iloc[1,0] == "select * from test"
# validate the execution created one entry in the cache
len(ri.keys(cache_prefix+'*')) == 1
df1 = execute_select("test","select * from test")
# validate retrieval from cache
df1.iloc[0,0] == "test"
df1.iloc[1,0] == "select * from test"
# validate there is no new entry added to the cache
len(ri.keys(cache_prefix+'*')) == 1
df2 = execute_empty()
# validate there is no issue with empty dataframe
df2.empty
# validate there is a new entry in the cache
len(ri.keys(cache_prefix+'*')) == 2
execute_update("test","insert into test values")
# validate there is no new entry as execute_update is specified not to cache
len(ri.keys(cache_prefix+'*')) == 2
flush_cache()
# validate the cache is flushed
len(ri.keys(cache_prefix+'*')) == 0
if __name__=="__main__":
test_run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment