Last active
February 11, 2017 16:00
-
-
Save kovid-rathee/160ffaa6233b298474cca21a25c4667d to your computer and use it in GitHub Desktop.
Python Script to Export Data into a File from MySQL on the MySQL machine and Upload it to S3 and Copy it to Amazon Redshift. Simple as that. Plus, email notifications. This is just a skeleton.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
import os | |
import sys | |
import shlex | |
import boto3 | |
import shutil | |
import logging | |
import urllib2 | |
import botocore | |
import sendgrid | |
import subprocess | |
from datetime import datetime | |
directory = "/usr/local/automation/wip/" | |
report_export = "/tmp/report.txt" | |
report_export_zipped = "/tmp/report.txt.gz" | |
report_export_moved = "/usr/local/automation/wip/report.txt.gz" | |
class Logger(object): | |
def __init__(self, logger, log_level=logging.INFO): | |
self.logger = logger | |
self.log_level = log_level | |
self.linebuf = '' | |
def write(self, buf): | |
for line in buf.rstrip().splitlines(): | |
self.logger.log(self.log_level, line.rstrip()) | |
logging.basicConfig( | |
level=logging.DEBUG, | |
format='%(asctime)s:%(levelname)s:%(name)s:%(message)s', | |
filename="report.log", | |
filemode='w' | |
) | |
def debug_log(): | |
stdout_logger = logging.getLogger('STDOUT') | |
sl_out = Logger(stdout_logger, logging.INFO) | |
sys.stdout = sl_out | |
def error_log(): | |
stderr_logger = logging.getLogger('STDERR') | |
sl_err = Logger(stderr_logger, logging.ERROR) | |
sys.stderr = sl_err | |
def transform_report_data(): | |
with open('report.log','a') as f: | |
print(subprocess.Popen('mysql --login-path=etl -A -e"call random.refresh_report()"', shell=True, stdout=f, stderr=f)) | |
return; | |
def export_report_data(): | |
with open('report.log','a') as f: | |
if os.path.exists(report_export): | |
os.remove(report_export) | |
print(subprocess.Popen('mysql --login-path=etl -A -e"call random.export_report()"; gzip /tmp/report.txt', shell=True, stdout=f, stderr=f)) | |
return; | |
def remove_residual_files(): | |
with open('report.log','a') as f: | |
if os.path.exists(report_export_zipped): | |
if os.path.exists(report_export_moved): | |
os.remove(report_export_moved) | |
print(shutil.move(report_export_zipped,report_export_moved)) | |
s3 = boto3.resource('s3') | |
bucket = s3.Bucket('base-bucket') | |
objs = list(bucket.objects.filter(Prefix='kovid-testing/report.txt.gz')) | |
if len(objs) > 0 and objs[0].key == 'kovid-testing/report.txt.gz': | |
print("Deleting Existing File!") | |
bucket.delete_objects(Delete={'Objects': [{'Key': 'kovid-testing/report.txt.gz'}]}) | |
else: | |
print("File Doesn't exist") | |
return; | |
def upload_file_to_s3(): | |
with open('report.log','a') as f: | |
s3 = boto3.resource('s3') | |
s3up = boto3.client('s3') | |
bucket = s3.Bucket('base-bucket') | |
objs = list(bucket.objects.filter(Prefix='kovid-testing/report.txt.gz')) | |
if len(objs) > 0 and objs[0].key == 'kovid-testing/report.txt.gz': | |
print("File Already Exists!") | |
else: | |
s3up.upload_file(report_export_zipped, 'base-bucket', "kovid-testing/report.txt.gz") | |
return; | |
def copy_data_to_redshift(): | |
with open('report.log','a') as f: | |
postgres_user = "kovidrathee" | |
postgres_host = "my-redshift-cluster.2fhd8ghsdghs.ap-southeast-1.redshift.amazonaws.com" | |
postgres_port = "5439" | |
redshift_dbse = "segment" | |
truncate_rdft = "truncate table mydatabase.report;" | |
upload_t_rdft = "copy mydatabase.report from 's3://base-bucket/kovid-testing/report.txt.gz' credentials 'aws_access_key_id=AWS_ACCESS_KEY;aws_secret_access_key=AWS_SECRET_KEY' delimiter '|' gzip removequotes ESCAPE ACCEPTINVCHARS ACCEPTANYDATE;" | |
print(subprocess.Popen('psql -U' + postgres_user + ' -h ' + postgres_host + ' -p ' + postgres_port + ' ' + redshift_dbse + ' -c"' + truncate_rdft + '"', shell=True, stdout=f, stderr=f)) | |
print(subprocess.Popen('psql -U' + postgres_user + ' -h ' + postgres_host + ' -p ' + postgres_port + ' ' + redshift_dbse + ' -c"' + upload_t_rdft + '"', shell=True, stdout=f, stderr=f)) | |
def send_email_notification_api(): | |
sg = sendgrid.SendGridAPIClient(apikey=os.environ.get('SENDGRID_API_KEY')) | |
data = { | |
"personalizations": [ | |
{ | |
"to": [ | |
{ | |
"email": "[email protected]" | |
} | |
], | |
"subject": "ETL Notifications" | |
} | |
], | |
"from": { | |
"email": "[email protected]" | |
}, | |
"content": [ | |
{ | |
"type": "text/plain", | |
"value": "ETL Successful!" | |
} | |
] | |
} | |
response = sg.client.mail.send.post(request_body=data) | |
print(error_log()) | |
print(transform_report_data()) | |
print(export_report_data()) | |
print(remove_residual_files()) | |
print(upload_file_to_s3()) | |
print(copy_data_to_redshift()) | |
print(send_email_notification_api()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment