Skip to content

Instantly share code, notes, and snippets.

Forked from codeinthehole/
Created December 14, 2022 01:35
Show Gist options
  • Save stungkit/ef5798572cd26939ca238fdcb2a3bdf0 to your computer and use it in GitHub Desktop.
Save stungkit/ef5798572cd26939ca238fdcb2a3bdf0 to your computer and use it in GitHub Desktop.
Sample Celery chain usage for processing pipeline
from celery import chain
from import BaseCommand
from . import tasks
class Command(BaseCommand):
def handle(self, *args, **kwargs):
source_file = args[0]
tasks.fetch.s(source_file), # Fetch data from remote source
tasks.blacklist.s(), # Remove blacklisted records
tasks.transform.s(), # Transform raw data ready for loading
tasks.load.s(), # Load into DB
import shutil
import os
from celery import task
def fetch(fixture_path):
Fetch a file from a remote location
destination = "/tmp/source.csv"
print "Fetching data from %s - saving to %s" % (fixture_path, destination)
shutil.copyfile(fixture_path, destination)
return destination
def blacklist(source_path):
base, ext = os.path.splitext(source_path)
destination = "%s-afterblacklist%s" % (base, ext)
print "Transforming data in %s to %s" % (source_path, destination)
shutil.copyfile(source_path, destination)
return destination
def transform(source_path):
base, ext = os.path.splitext(source_path)
destination = "%s-transformed%s" % (base, ext)
print "Transforming data in %s to %s" % (source_path, destination)
shutil.copyfile(source_path, destination)
return destination
def load(filepath):
print "Loading data in %s and removing" % filepath
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment