Last active
December 21, 2015 10:40
-
-
Save brouberol/051447e65550bf1bb1a3 to your computer and use it in GitHub Desktop.
ChainedTask celery abstract task
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import Task | |
class ChainedTask(Task): | |
abstract = True | |
def apply_async(self, args, kwargs, **options): | |
"""Injects the dict return value of a a task as keyword args of the next one.""" | |
if args: | |
# handle the case of a bound task (decorated with bind=True) | |
if isinstance(args[0], Task) | |
if len(args) > 1 and isinstance(args[1], dict): | |
task_kwargs = args[1] | |
args.pop(1) | |
else: | |
task_kwargs = {} | |
else: | |
if isinstance(args[0], dict): | |
task_kwargs = args[0] | |
args.pop(0) | |
else: | |
task_kwargs = {} | |
kwargs.update(task_kwargs) | |
return super(ChainedTask, self).apply_async(args, kwargs, **options) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment