-
-
Save jimmytuc/ac95ceaa221d8be8e34733e1697120a2 to your computer and use it in GitHub Desktop.
Example ETL Using Luigi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import python core modules | |
import datetime | |
import logging | |
# import external modules | |
import pandas as pd | |
import requests | |
# import luigi modules | |
import luigi | |
from luigi.contrib import redshift, s3 | |
logger = logging.getLogger('luigi-interface') | |
# meta data | |
__author__ = 'Dillon Stadther' | |
__date__ = '2018-01-30' | |
DATE_FORMAT = '%Y-%m-%d' | |
DATEHOUR_FORMAT = '%Y-%m-%dT%H' | |
DATEMINUTE_FORMAT = '%Y-%m-%dT%H%M' | |
class path(luigi.Config): | |
tos3 = luigi.Parameter() | |
s3_load_bucket = luigi.Parameter() | |
class redshift(luigi.Config): | |
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg.""" | |
host = luigi.Parameter(default='') | |
database = luigi.Parameter(default='') | |
user = luigi.Parameter(default='') | |
password = luigi.Parameter(default='') | |
class s3(luigi.Config): | |
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg.""" | |
aws_access_key_id = luigi.Parameter(default='') | |
aws_secret_access_key = luigi.Parameter(default='') | |
class ExampleTask(luigi.WrapperTask): | |
date = luigi.DateHourParameter(default=datetime.datetime.utcnow()) | |
def requires(self): | |
""" | |
Anything returned or yielded by requires must have a 'true' complete() method (aka successful output) before | |
this class's run method will execute. | |
""" | |
jobs = [ | |
{ | |
'table': 'my_table', | |
'fn': 'my_file' | |
}, | |
{ | |
'table': 'your_table', | |
'fn': 'your_file' | |
} | |
] | |
for job in jobs: | |
yield RedshiftCopy( | |
date=self.date, | |
table=job['table'], | |
fn=job['fn'] | |
) | |
class RedshiftCopy(redshift.S3CopyToTable): | |
"""Implementation of redshift.S3CopyToTable""" | |
date = luigi.DateParameter() | |
fn = luigi.Parameter() | |
# there are a lot more options available here (i.e. temp table copy, prune based on date or column, etc.) | |
table_type = luigi.Parameter(default='') | |
table = luigi.Parameter() | |
queries = luigi.ListParameter(default=[]) | |
copy_options = "CSV IGNOREHEADER 1 BLANKSASNULL EMPTYASNULL TIMEFORMAT 'auto' DATEFORMAT 'auto'" | |
host = redshift().host | |
database = redshift().database | |
user = redshift().user | |
password = redshift().password | |
aws_access_key_id = s3().aws_access_key_id | |
aws_secret_access_key = s3().aws_secret_access_key | |
def s3_load_path(self): | |
# optional usages (for the variations noted on L98-L100) | |
# return self.input().path | |
# return self.input()['s3'].path | |
return self.input()[0].path | |
def requires(self): | |
# optional variations are: | |
# return ToS3(...) | |
# return {'s3': ToS3(...)} | |
return [ | |
ToS3(date=self.date, fn=self.fn) | |
] | |
class ToS3(luigi.Task): | |
"""Utilization of luigi.s3. Pushes single input file to designated s3 bucket.""" | |
date = luigi.DateParameter() | |
fn = luigi.Parameter() | |
client = s3.S3Client() # you do not have to specify parameters here b/c the S3Client() looks in your luigi.cfg | |
@property | |
def fn_src(self): | |
return self.input()[0].path.split('/')[-1] | |
def requires(self): | |
return [ | |
ExampleBaseTask(date=self.date, fn=self.fn) | |
] | |
def output(self): | |
return s3.S3Target("{}{}".format(path().s3_load_bucket, self.src_fn), client=self.client) | |
def run(self): | |
logger.info('Uploading {} to {}'.format(self.input()[0].path, self.output().path)) | |
self.client.put(self.input()[0].path, self.output().path) | |
class ExampleBaseTask(luigi.Task): | |
"""Main task to do whatever. Sometimes this may also requies other things""" | |
date = luigi.DateParameter() | |
fn = luigi.Parameter() | |
# since this task doesn't 'require' anything, that method can be defaulted to luigi.Task.require() | |
def output(self): | |
return luigi.LocalTarget( | |
"{path}/{fn}_{date}.csv".format( | |
path=path().tos3, | |
fn=self.fn, | |
date=self.date.strftime(DATE_FORMAT) | |
) | |
) | |
def run(self): | |
response = requests.get('http://whatever.com/api/endpoint') | |
data = # somehow you got response.content into a list of lists (aka, csv format) and you happen to know the header | |
# convert to dataframe | |
df = pd.DataFrame(data, columns=['col1','col2','col3', 'col4']) | |
# you can now manipulate the data easily within the dataframe | |
# one easy thing is reordering desired columns | |
df = df[['col2', 'col4', 'col1']] | |
# write dataframe to output csv | |
df.to_csv(self.output().path, index=False, encoding='utf-8') | |
if __name__ == "__main__": | |
luigi.run() # run via CLI with non-default var `python example_etl.py ExampleTask --date '2018-01-01'` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[DEFAULT] | |
user_path: /home/my-user | |
[loggers] | |
keys=root, luigi-interface | |
[formatters] | |
keys=standardFormatter, consoleFormatter | |
[handlers] | |
keys=root, luigiHandler, consoleHandler | |
[logger_root] | |
level=ERROR | |
handlers=root | |
[logger_luigi-interface] | |
level=DEBUG | |
handlers=consoleHandler,luigiHandler | |
qualname=luigi-interface | |
propagate=0 | |
[formatter_standardFormatter] | |
format=%(asctime)s.%(msecs)03d %(name)-12s %(levelname)-8s %(message)s | |
datefmt=%y-%m-%d %H:%M:%S | |
[formatter_consoleFormatter] | |
format=%(levelname)s - %(message)s | |
datefmt= | |
[handler_root] | |
level=WARNING | |
class=handlers.TimedRotatingFileHandler | |
formatter=standardFormatter | |
args=("%(user_path)s/outputs/log/luigi-root.log","midnight",1,14) | |
[handler_luigiHandler] | |
class=handlers.TimedRotatingFileHandler | |
formatter=standardFormatter | |
args=("%(user_path)s/outputs/log/luigi.log","midnight",1,14) | |
[handler_consoleHandler] | |
class=StreamHandler | |
level=WARNING | |
formatter=consoleFormatter | |
args=(sys.stdout,) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[DEFAULT] | |
user_path: /home/my-user | |
[core] | |
default-scheduler-port: 8082 | |
error-email: <your_email>@<domain>.com | |
email-prefix: [LUIGI] | |
email-sender: <email which you want to send the email> | |
email-type: plain | |
max-reschedules: 1 | |
smtp_host: smtp.gmail.com | |
smtp_login: <same as email-sender> | |
smtp_password: <email sender password> | |
smtp_port: 587 | |
logging_conf_file: %(user_path)s/this-etl/logging.conf | |
[path] | |
tmp_path: %(user_path)s/outputs/tmp | |
tos3_path: %(user_path)s/outputs/tos3 | |
s3_load_bucket: s3://name.of.your.desired.bucket/ | |
s3_complete_bucket: s3://name.of.your.complete.bucket/ | |
[redshift] | |
host: <cluster_name>.<dns_to_your_redshift_cluster>:<port_num> | |
database: <db_name> | |
user: <db_user> | |
password: <db_password> | |
[s3] | |
aws_access_key_id: <key> | |
aws_secret_access_key: <secret> | |
calling_format: boto.s3.connection.OrdinaryCallingFormat |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment