-
-
Save stungkit/ef5798572cd26939ca238fdcb2a3bdf0 to your computer and use it in GitHub Desktop.
Sample Celery chain usage for processing pipeline
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from celery import chain | |
from django.core.management.base import BaseCommand | |
from . import tasks | |
class Command(BaseCommand): | |
def handle(self, *args, **kwargs): | |
source_file = args[0] | |
chain( | |
tasks.fetch.s(source_file), # Fetch data from remote source | |
tasks.blacklist.s(), # Remove blacklisted records | |
tasks.transform.s(), # Transform raw data ready for loading | |
tasks.load.s(), # Load into DB | |
).apply_async() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import shutil | |
import os | |
from celery import task | |
@task() | |
def fetch(fixture_path): | |
""" | |
Fetch a file from a remote location | |
""" | |
destination = "/tmp/source.csv" | |
print "Fetching data from %s - saving to %s" % (fixture_path, destination) | |
shutil.copyfile(fixture_path, destination) | |
return destination | |
@task() | |
def blacklist(source_path): | |
base, ext = os.path.splitext(source_path) | |
destination = "%s-afterblacklist%s" % (base, ext) | |
print "Transforming data in %s to %s" % (source_path, destination) | |
shutil.copyfile(source_path, destination) | |
return destination | |
@task() | |
def transform(source_path): | |
base, ext = os.path.splitext(source_path) | |
destination = "%s-transformed%s" % (base, ext) | |
print "Transforming data in %s to %s" % (source_path, destination) | |
shutil.copyfile(source_path, destination) | |
return destination | |
@task() | |
def load(filepath): | |
print "Loading data in %s and removing" % filepath | |
os.remove(filepath) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment