Created
March 15, 2014 22:08
-
-
Save LucasRoesler/9574647 to your computer and use it in GitHub Desktop.
Demonstration of how to create cron like periodic tasks using python RQ (http://python-rq.org/), django_rq (https://github.com/ui/django-rq), rq_scheduler (https://github.com/ui/rq-scheduler)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# put in app/management/commands | |
from datetime import datetime | |
import importlib | |
import logging | |
from optparse import make_option | |
from django.conf import settings | |
from django.core.management.base import BaseCommand | |
import django_rq | |
logger = logging.getLogger(__name__) | |
def import_function(path): | |
""" | |
Given the fully qualified path to the function, return that function. | |
""" | |
# path should be of the form module.submodule...submodule.function | |
# function name is the element after the last period | |
function_name = path.split('.')[-1] | |
# the module path is the full path minus the function_name and its leading | |
# period. | |
module_path = path[:-len(function_name)-1] | |
lib = importlib.import_module(module_path) | |
return lib.__getattribute__(function_name) | |
class Command(BaseCommand): | |
""" | |
Runs RQ scheduler | |
""" | |
help = __doc__ | |
option_list = BaseCommand.option_list + ( | |
make_option( | |
'--verbose', | |
dest='verbose', | |
action="store_true", | |
default=False, | |
help="If True, it will print the list of scheduled tasks.", | |
), | |
make_option( | |
'--queue', | |
dest='queue', | |
default='default', | |
help='Queue that scheduled tasks will be sent to.' | |
) | |
) | |
def handle(self, task_module=None, *args, **options): | |
verbose = options.get('verbose') | |
queue = options.get('queue') | |
scheduler = django_rq.get_scheduler(queue) | |
# cancel any currently scheduled tasks that are listed in the | |
# settings. | |
func_names = [j['task'] for j in settings.RQ_PERIODIC] | |
for j in scheduler.get_jobs(): | |
if j.func_name in func_names: | |
j.cancel() | |
for task in settings.RQ_PERIODIC: | |
# grab the function path, remove it from the dictionary | |
t = task.pop('task') | |
# add the function object to the dictionary | |
# and set repeat to None, the task will repeat forever. | |
task.update({ | |
'func': import_function(t), | |
'repeat': None | |
}) | |
# add the task to the schedule | |
scheduler.schedule( | |
scheduled_time=datetime.now(), | |
**task | |
) | |
# verbose logging | |
if verbose: | |
scheduled = scheduler.get_jobs() | |
logger.info("Adding {} to scheduled tasks.".format(t)) | |
logger.info("Currently scheduled tasks ({}):\n {}".format( | |
len(scheduled), | |
'\n\t'.join([j.func_name for j in scheduled]) | |
)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# add the following to your django settings | |
# settings describing the tasks that should be run by the rqperiodc management | |
# command. A list of dicts of the form | |
# { | |
# 'task': fully qualified path to the task, | |
# 'args': list of arguments to pass to the task, | |
# 'kwargs': a dict of kwargs to pass, | |
# 'interval': seconds between runs, | |
# 'timeout': task timeout, use -1 to allow unlimited time, | |
# 'result_ttl': seconds the result is kept, | |
# } | |
RQ_PERIODIC = [ | |
{ | |
'task': 'app.my_hourly_task', | |
'args': [], | |
'kwargs': {}, | |
'interval': 3600, | |
'timeout': -1, | |
'result_ttl': 86400, | |
}, | |
{ | |
'task': 'app.my_hourly_task_w_args', | |
'args': [], | |
'kwargs': {'name':'Bob'}, | |
'interval': 3600, | |
'timeout': -1, | |
'result_ttl': 86400, | |
}, | |
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# I put all of my functions what will be run via RQ into a tasks.py folder, one per app. | |
from datetime import datetime | |
from django_rq import job | |
# use negative timeouts for unbound tasks. | |
@job('default', timeout=-1) | |
def my_hourly_task(): | |
print "This is a simple test", datetime.now() | |
@job('default') | |
def my_hourly_task_w_args(name=None): | |
if name: | |
print "{}'s hourly task".format(name) | |
else: | |
print "Another hourly task" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment