Demonstration of a task which runs a startup task, then parallelizes multiple worker tasks, and then fires-off a reducer task.
If passing results around would be important, then could use a chord instead for task2 and task3.
| from time import sleep | |
| from celery.utils.log import get_task_logger | |
| from celery.decorators import task | |
| from celery import group, chain | |
| from django.apps import apps | |
| from django.utils import timezone | |
| logger = get_task_logger(__name__) | |
| @task() | |
| def execute_analysis(id_): | |
| task1 = startup_task.si(id_) | |
| task2 = group(parallel_task.si(i) for i in range(10)) | |
| task3 = reducer_task.si(id_) | |
| return chain(task1, task2, task3)() | |
| @task() | |
| def startup_task(id_): | |
| logger.info('-----> starter task started') | |
| sleep(2) | |
| logger.info('-----> starter task complete') | |
| @task() | |
| def parallel_task(id_): | |
| logger.info('-------> parallel task started %s' % id_) | |
| sleep(2) | |
| logger.info('-------> parallel task complete %s' % id_) | |
| @task | |
| def reducer_task(id_): | |
| logger.info('-----> reducer task started') | |
| sleep(2) | |
| logger.info('-----> reducer task complete') |