Created
November 21, 2012 13:46
-
-
Save codeinthehole/4124910 to your computer and use it in GitHub Desktop.
Sample Celery chain usage for processing pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import chain | |
from django.core.management.base import BaseCommand | |
from . import tasks | |
class Command(BaseCommand): | |
def handle(self, *args, **kwargs): | |
source_file = args[0] | |
chain( | |
tasks.fetch.s(source_file), # Fetch data from remote source | |
tasks.blacklist.s(), # Remove blacklisted records | |
tasks.transform.s(), # Transform raw data ready for loading | |
tasks.load.s(), # Load into DB | |
).apply_async() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shutil | |
import os | |
from celery import task | |
@task() | |
def fetch(fixture_path): | |
""" | |
Fetch a file from a remote location | |
""" | |
destination = "/tmp/source.csv" | |
print "Fetching data from %s - saving to %s" % (fixture_path, destination) | |
shutil.copyfile(fixture_path, destination) | |
return destination | |
@task() | |
def blacklist(source_path): | |
base, ext = os.path.splitext(source_path) | |
destination = "%s-afterblacklist%s" % (base, ext) | |
print "Transforming data in %s to %s" % (source_path, destination) | |
shutil.copyfile(source_path, destination) | |
return destination | |
@task() | |
def transform(source_path): | |
base, ext = os.path.splitext(source_path) | |
destination = "%s-transformed%s" % (base, ext) | |
print "Transforming data in %s to %s" % (source_path, destination) | |
shutil.copyfile(source_path, destination) | |
return destination | |
@task() | |
def load(filepath): | |
print "Loading data in %s and removing" % filepath | |
os.remove(filepath) |
Hi, Instead of @task, I use @shared_task. When I chain my tasks,
chain(fetch.subtask(resource_id), process.subtask())
I get an error.
TypeError: 'int' object is not iterable
Hi !
Is there a way to revoke all the tasks in the chain.?
from <backeend>.celery import app
revoke = app.control.revoke
revoke(<chain_id>, terminate=True)
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi! Could any task in chain run some other tasks (e.g.: I want to split raw data into chunks in transform() and process the chunks by another celery tasks in parallel manner)