Last active
June 9, 2021 07:35
-
-
Save dlstadther/f835a234f54a46728412 to your computer and use it in GitHub Desktop.
Using Luigi's Redshift and S3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import luigi modules | |
import luigi | |
from luigi.contrib import redshift | |
from luigi import configuration, s3 | |
# import python core modules | |
import os | |
import shutil | |
import datetime | |
# meta data | |
__author__ = 'Dillon Stadther' | |
__date__ = '2016-01-06' | |
class redshift(luigi.Config): | |
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg.""" | |
host = luigi.Parameter(default='') | |
database = luigi.Parameter(default='') | |
user = luigi.Parameter(default='') | |
password = luigi.Parameter(default='') | |
class s3(luigi.Config): | |
"""Utilizes Luigi's config system to get config variables within the <class_name> tag from luigi.cfg.""" | |
aws_access_key_id = luigi.Parameter(default='') | |
aws_secret_access_key = luigi.Parameter(default='') | |
class ExampleTask(luigi.WrapperTask): | |
# overkill - if run on a machine using UTC time, then use: | |
# date = luigi.DateParameter(default=datetime.date.today()) | |
date = luigi.DateParameter(default=datetime.date(datetime.datetime.utcnow())) | |
def requires(self): | |
""" | |
Anything returned or yielded by requires must have a 'true' complete() method (aka successful output) before | |
this class's run method will execute. | |
""" | |
jobs = [ | |
{ | |
'table': 'my_table', | |
'fn': 'my_file' | |
}, | |
{ | |
'table': 'your_table', | |
'fn': 'your_file' | |
} | |
] | |
for job in jobs: | |
yield MoveS3( | |
date=self.date, | |
table=job['table'], | |
fn=job['fn'] | |
) | |
class MoveS3(luigi.Task): | |
"""Utilization of luigi.contrib.s3.move() to move a single file from one s3 bucket to another""" | |
date = luigi.DateParameter() | |
table = luigi.Parameter() | |
fn = luigi.Parameter() | |
s3_load_bucket = luigi.Parameter(config_path=dict(section='path', name='s3_load_bucket')) | |
s3_complete_bucket = luigi.Parameter(config_path=dict(section='path', name='s3_complete_bucket')) | |
client = s3.S3Client() | |
def requires(self): | |
return [RedshiftCopy(date=self.date, table=self.table, fn=self.fn)] | |
def output(self): | |
return s3.S3Target(self.s3_complete_bucket + self.fn, client=self.client) | |
def run(self): | |
self.client.move(self.s3_load_bucket + self.fn, self.s3_complete_bucket + self.fn) | |
class RedshiftCopy(redshift.S3CopyToTable): | |
"""Implementation of redshift.S3CopyToTable""" | |
date = luigi.DateParameter() | |
fn = luigi.Parameter() | |
# there are a lot more options available here (i.e. temp table copy, prune based on date or column, etc.) | |
table_type = luigi.Parameter(default='') | |
table = luigi.Parameter() | |
queries = luigi.Parameter(default=[]) | |
copy_options = "CSV IGNOREHEADER 1 BLANKSASNULL EMPTYASNULL TIMEFORMAT 'auto' DATEFORMAT 'auto'" | |
host = redshift().host | |
database = redshift().database | |
user = redshift().user | |
password = redshift().password | |
aws_access_key_id = s3().aws_access_key_id | |
aws_secret_access_key = s3().aws_secret_access_key | |
def s3_load_path(self): | |
# optional usages (for the variations noted on L108-L109 | |
# return self.input().path | |
# return self.input()['s3'].path | |
return self.input()[0].path | |
def requires(self): | |
# optional variations are: | |
# return ToS3(...) | |
# return {'s3': ToS3(...)} | |
return [ | |
ToS3(date=self.date, fn=self.fn) | |
] | |
class ToS3(luigi.Task): | |
"""Utilization of luigi.s3. Pushes single input file to designated s3 bucket.""" | |
date = luigi.DateParameter() | |
fn = luigi.Parameter() | |
s3_load_bucket = luigi.Parameter(config_path=dict(section='path', name='s3_load_bucket')) | |
tos3_path = luigi.Parameter(config_path=dict(section='path', name='tos3_path')) | |
client = s3.S3Client() # you do not have to specify parameters here b/c the S3Client() looks in your luigi.cfg | |
def get_fn(self): | |
return self.input()[0].path.split('/')[-1] | |
def requires(self): | |
return [ | |
ExampleBaseTask(date=self.date, fn=self.fn) | |
] | |
def output(self): | |
return s3.S3Target(self.s3_load_bucket + self.get_fn(), client=self.client) | |
def run(self): | |
self.client.put(self.tos3_path + '/' + self.get_fn(), self.s3_load_bucket + self.get_fn()) | |
class ExampleBaseTask(luigi.Task): | |
"""Main task to do whatever. Sometimes this may also requies other things""" | |
date = luigi.DateParameter() | |
fn = luigi.Parameter() | |
toS3_path = luigi.Parameter(config_path=dict(section='path', name='toS3_path')) | |
# since this task doesn't 'require' anything, that method can be defaulted to luigi.Task.require() | |
def output(self): | |
return luigi.LocalTarget( | |
"{path}/{fn}_{date}.csv".format( | |
path=self.toS3_path, | |
fn=self.fn, | |
date=str(self.date) | |
) | |
) | |
def run(self): | |
pass | |
# do stuff | |
self.output().open('w').close() # create blank output file | |
if __name__ == "__main__": | |
config = luigi.configuration.get_config() | |
tmp_path = config.get('path', 'tmp_path') | |
if os.path.exists(tmp_path): # I keep a tmp directory that is cleared prior to execution. | |
shutil.rmtree(tmp_path) | |
os.makedirs(tmp_path) | |
luigi.run(['ExampleTask', '--workers', '1']) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[core] | |
default-scheduler-port: 8082 | |
error-email: <your_email>@<domain>.com | |
email-prefix: [LUIGI] | |
email-sender: <email which you want to send the email> | |
email-type: plain | |
max-reschedules: 1 | |
smtp_host: smtp.gmail.com | |
smtp_login: <same as email-sender> | |
smtp_password: <email sender password> | |
smtp_port: 587 | |
[path] | |
tmp_path: /Users/<user>/Desktop/outputs/tmp | |
toS3_path: /Users/<user>/Desktop/outputs/tos3 | |
s3_load_bucket: s3://name.of.your.desired.bucket/ | |
s3_complete_bucket: s3://name.of.your.complete.bucket/ | |
[redshift] | |
host: <cluster_name>.<dns_to_your_redshift_cluster>:<port_num> | |
database: <db_name> | |
user: <db_user> | |
password: <db_password> | |
[s3] | |
aws_access_key_id: <key> | |
aws_secret_access_key: <secret> | |
calling_format: boto.s3.connection.OrdinaryCallingFormat |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment