Created
August 31, 2011 12:11
-
-
Save ask/1183399 to your computer and use it in GitHub Desktop.
traverse celery results
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from collections import deque | |
from celery.result import BaseAsyncResult, TaskSetResult | |
from celery.task import chord, task, TaskSet | |
def force_list(l): | |
if not isinstance(l, (list, tuple)): | |
return [l] | |
return l | |
def traverse(start): | |
stack = deque([start]) | |
while stack: | |
for subres in force_list(stack.popleft()): | |
if isinstance(subres, TaskSetResult): | |
stack.append(subres.join()) | |
elif isinstance(subres, BaseAsyncResult): | |
stack.append(subres.get()) | |
else: | |
yield subres | |
@task | |
def tA(): | |
return tB.apply_async() | |
@task | |
def tB(): | |
return TaskSet(tC.subtask((i, )) for i in xrange(30)).apply_async() | |
@task | |
def tC(i): | |
return chord(tD.subtask((i, )) for i in xrange(i))(tS.subtask()) | |
@task | |
def tD(i): | |
return i ** i | |
@task | |
def tS(numbers): | |
return sum(numbers) | |
def test(): | |
for res in traverse(tA.apply_async()): | |
print(res) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment