Skip to content

Instantly share code, notes, and snippets.

@sinkers
Created September 5, 2014 02:36
Show Gist options
  • Save sinkers/d4e0bcac5f226e8bcc21 to your computer and use it in GitHub Desktop.
Save sinkers/d4e0bcac5f226e8bcc21 to your computer and use it in GitHub Desktop.
Celery chord issue
# tasks.py
from celery import task, Celery, subtask, group
import time
REDIS_SERVER = 'redis://localhost:6379'
celery = Celery("app", broker=REDIS_SERVER, backend=REDIS_SERVER)
@task()
def task1(input):
time.sleep(1)
return "task1 " + input
@task()
def task2(input):
time.sleep(1)
return "task2 " + input
@task()
def task3(input):
time.sleep(1)
return "task3 " + input
@task()
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
# run.py
from celery import chord
import tasks
params = ["A","B","C","D"]
task_1 = (tasks.task1.s(p) for p in params)
task_2 = (tasks.dmap.s(tasks.task2.s()))
task_3 = tasks.task3.s()
workflow = chord([task_1, task_2])(task_3)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment