Created
March 27, 2012 08:14
-
-
Save rca/2213915 to your computer and use it in GitHub Desktop.
break a big celery job into smaller, batched, chunks
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Celery tasks that batch a job with many tasks into smaller work sets. | |
The problem I'm attempting to solve is one where a job comprised of many | |
tasks (say 100) will snub out a job comprised of only a few tasks (say 5). It | |
appears as though by default celery will queue up the second job's 5 tasks | |
behind the first job's 100 and it will have to wait until the first job's | |
completion before it even begins. | |
This prototype code shows how jobs can be batched into smaller task sizes in | |
order to prevent a massive job from taking over all processing resources. | |
Caveat: as the code is written, a job will not detect a subtask erroring out | |
and will continue processing the job to completion. This may or may not be | |
desired. | |
""" | |
import os | |
import random | |
import time | |
from logging import info | |
from celery.task import chord, task | |
from celery.task.sets import TaskSet | |
WORK_DIR = '/tmp/work_done' # where work goes | |
# this number should be chosen relative to the number of workers in the pool so | |
# that multiple jobs coming into the system don't have to wait too long to | |
# start processing. making this number low leaves idle resources when there | |
# are no additional jobs, but making it too high, adds a start penalty to any | |
# job entering the system. | |
ITEMS_PER_BATCH = 5 | |
@task | |
def prep_job(job_id, num_tasks): | |
""" | |
Prepare by creating sets of tasks each made up of <ITEMS_PER_BATCH> items | |
""" | |
# create a list of numbers that represent jobs | |
numbers = range(num_tasks) | |
# a list of lists representing the jobs in batches | |
batches = get_batches(numbers, ITEMS_PER_BATCH) | |
# calling job_dispatch will start processing the jobs one batch at a time | |
return job_dispatch.delay(None, job_id, batches) | |
@task | |
def job_dispatch(results, job_id, batches): | |
""" | |
Process the job batches one at a time | |
When there is more than one batch to process, a chord is used to delay the | |
execution of remaining batches. | |
""" | |
batch = batches.pop(0) | |
info('dispatching job_id: {0}, batch: {1}, results: {2}'.format(job_id, batch, results)) | |
tasks = [job_worker.subtask((job_id, task_num)) for task_num in batch] | |
# when there are other batches to process, use a chord to delay the | |
# execution of remaining tasks, otherwise, finish off with a TaskSet | |
if batches: | |
info('still have batches, chording {0}'.format(batches)) | |
callback = job_dispatch.subtask((job_id, batches)) | |
return chord(tasks)(callback) | |
else: | |
info('only batch, calling TaskSet') | |
return TaskSet(tasks=tasks).apply_async() | |
@task | |
def job_worker(job_id, task_num): | |
return do_the_work(job_id, task_num) | |
# | |
# Helper methods | |
# | |
def get_batches(work, batch_size): | |
batches = [] | |
while work: | |
t_batch_size = min(len(work), batch_size) | |
batches.append(work[:t_batch_size]) | |
work = work[t_batch_size:] | |
return batches | |
def do_the_work(job_id, task_num): | |
""" | |
Example of some work unit that will take some time within 2s to complete. | |
It writes to the WORK_DIR directory to demonstrate (by listing the | |
directory) that other jobs are able to interleave their tasks with another big | |
job. | |
""" | |
sleep_time = random.random() * 2 | |
# fail every once in a while to see if the entire job is shorted after a | |
# failure. | |
if sleep_time < 0.2: | |
raise Exception('misc failure') | |
info('job_worker, sleeping {0}'.format(sleep_time)) | |
time.sleep(sleep_time) | |
now = time.time() | |
# try/except just in case another worker got to creating the directory | |
# first. | |
try: | |
if not os.path.exists(WORK_DIR): | |
os.makedirs(WORK_DIR) | |
except OSError, exc: | |
if exc[0] != 17: | |
raise | |
path = os.path.join(WORK_DIR, '{0}-{1}-{2}.txt'.format(now, job_id, task_num)) | |
info('touching {0}'.format(path)) | |
open(path, 'wb').close() | |
return path |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment