Last active
October 21, 2015 18:36
-
-
Save dangarthwaite/8ec2413fb1af3e480902 to your computer and use it in GitHub Desktop.
Skeleton luigi example.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# encoding: utf-8 | |
import luigi | |
import os | |
class Validate(luigi.Task): | |
region = luigi.Parameter() | |
def run(self): | |
with self.output().open('w') as out: | |
print "Validate {}".format(self.region) | |
out.write(self.region) | |
def output(self): | |
return luigi.LocalTarget('/tmp/data/{}/validate.log'.format(self.region)) | |
class ImportGroups(luigi.Task): | |
region = luigi.Parameter() | |
def requires(self): | |
return Validate(self.region) | |
def run(self): | |
with self.output().open('w') as out: | |
out.write("Importing groups: {}".format(self.region)) | |
def output(self): | |
return luigi.LocalTarget('/tmp/data/{}/groups.log'.format(self.region)) | |
class ImportUsers(luigi.Task): | |
region = luigi.Parameter() | |
def requires(self): | |
return ImportGroups(self.region) | |
def run(self): | |
with self.output().open('w') as out: | |
out.write("Importing users: {}".format(self.region)) | |
def output(self): | |
return luigi.LocalTarget('/tmp/data/{}/users.log'.format(self.region)) | |
class Fetch(luigi.Task): | |
def run(self): | |
try: os.mkdir('/tmp/data', 0770) | |
except OSError: pass | |
with self.output().open('w') as out: | |
for region in {x for x in os.listdir('/tmp/incoming') if x.endswith('_users.csv'}: | |
# Code here to transfer files to /tmp/data | |
print >>out, region | |
def output(self): | |
return luigi.LocalTarget('/tmp/data/regions.log') | |
class ImportRegions(luigi.Task): | |
def requires(self): | |
return Fetch() | |
def run(self): | |
with self.output().open('w') as out: | |
for region in self.input().open('r').readlines(): | |
region = region.strip() | |
print >>out, region | |
yield ImportUsers(region) | |
def output(self): | |
return luigi.LocalTarget('/tmp/data/import.log') | |
if __name__ == '__main__': | |
luigi.run(main_task_cls=ImportRegions, local_scheduler=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
If it isn't obvious - all the
def run(self):
methods need actual business logic in them.