Last active
February 25, 2016 17:09
-
-
Save vagelim/1dc98c906ce2779f7118 to your computer and use it in GitHub Desktop.
Luigi job to copy S3 data to Redshift
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import luigi | |
from luigi import configuration | |
import datetime | |
from luigi.contrib import redshift | |
from mortar.luigi import mortartask | |
from dd_tasks import DatadogPigscriptTask | |
from luigi.s3 import S3Target, S3PathTask | |
from mortar.luigi.s3transfer import S3ToLocalTask | |
import dd_utils | |
#from redshift_utils import CopyToRedshiftTask | |
import logging | |
logger = logging.getLogger('luigi-interface') | |
now = datetime.datetime.now() | |
filename = 'clean/' + now.strftime("%Y-%m-%d") + '/part-m-00000' | |
S3 = 's3://seranking/' + filename | |
OUT = 's3://seranking/copyjob' | |
class CopyToRedShift(redshift.S3CopyToTable): | |
output_base_path = OUT | |
columns = [ | |
('keyword', 'text'), | |
('search_volume', 'int'), | |
('ranking', 'int'), | |
('tags', 'text') | |
] | |
column_separator = '\t' | |
env = luigi.Parameter(default='prod') | |
version = luigi.Parameter(default='1.0') | |
cluster_size = luigi.IntParameter(default=0) | |
def redshift_credentials(self): | |
config = configuration.get_config() | |
section = 'redshift' | |
return { | |
'host' : config.get(section, 'prod_host'), | |
'port' : config.get(section, 'port'), | |
'database' : config.get(section, 'prod_database'), | |
'username' : config.get(section, 'prod_username'), | |
'password' : config.get(section, 'prod_password'), | |
'aws_access_key_id' : config.get(section, 'aws_access_key_id'), | |
'aws_secret_access_key' : config.get(section, 'aws_secret_access_key') | |
} | |
def s3_load_path(self): | |
return S3 | |
@property | |
def aws_access_key_id(self): | |
return self.redshift_credentials()['aws_access_key_id'] | |
@property | |
def aws_secret_access_key(self): | |
return self.redshift_credentials()['aws_secret_access_key'] | |
@property | |
def database(self): | |
return self.redshift_credentials()['database'] | |
@property | |
def user(self): | |
return self.redshift_credentials()['username'] | |
@property | |
def password(self): | |
return self.redshift_credentials()['password'] | |
@property | |
def host(self): | |
return self.redshift_credentials()['host'] + ':' + self.redshift_credentials()['port'] | |
@property | |
def table(self): | |
return 'seranking_test' | |
@property | |
def copy_options(self): | |
return 'IGNOREHEADER 0' | |
if __name__ == "__main__": | |
luigi.run(main_task_cls=CopyToRedShift) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment