Skip to content

Instantly share code, notes, and snippets.

@kovid-rathee
Last active February 11, 2017 16:00
Show Gist options
  • Save kovid-rathee/160ffaa6233b298474cca21a25c4667d to your computer and use it in GitHub Desktop.
Save kovid-rathee/160ffaa6233b298474cca21a25c4667d to your computer and use it in GitHub Desktop.
Python Script to Export Data into a File from MySQL on the MySQL machine and Upload it to S3 and Copy it to Amazon Redshift. Simple as that. Plus, email notifications. This is just a skeleton.
#!/usr/bin/env python
import os
import sys
import shlex
import boto3
import shutil
import logging
import urllib2
import botocore
import sendgrid
import subprocess
from datetime import datetime
directory = "/usr/local/automation/wip/"
report_export = "/tmp/report.txt"
report_export_zipped = "/tmp/report.txt.gz"
report_export_moved = "/usr/local/automation/wip/report.txt.gz"
class Logger(object):
def __init__(self, logger, log_level=logging.INFO):
self.logger = logger
self.log_level = log_level
self.linebuf = ''
def write(self, buf):
for line in buf.rstrip().splitlines():
self.logger.log(self.log_level, line.rstrip())
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s:%(levelname)s:%(name)s:%(message)s',
filename="report.log",
filemode='w'
)
def debug_log():
stdout_logger = logging.getLogger('STDOUT')
sl_out = Logger(stdout_logger, logging.INFO)
sys.stdout = sl_out
def error_log():
stderr_logger = logging.getLogger('STDERR')
sl_err = Logger(stderr_logger, logging.ERROR)
sys.stderr = sl_err
def transform_report_data():
with open('report.log','a') as f:
print(subprocess.Popen('mysql --login-path=etl -A -e"call random.refresh_report()"', shell=True, stdout=f, stderr=f))
return;
def export_report_data():
with open('report.log','a') as f:
if os.path.exists(report_export):
os.remove(report_export)
print(subprocess.Popen('mysql --login-path=etl -A -e"call random.export_report()"; gzip /tmp/report.txt', shell=True, stdout=f, stderr=f))
return;
def remove_residual_files():
with open('report.log','a') as f:
if os.path.exists(report_export_zipped):
if os.path.exists(report_export_moved):
os.remove(report_export_moved)
print(shutil.move(report_export_zipped,report_export_moved))
s3 = boto3.resource('s3')
bucket = s3.Bucket('base-bucket')
objs = list(bucket.objects.filter(Prefix='kovid-testing/report.txt.gz'))
if len(objs) > 0 and objs[0].key == 'kovid-testing/report.txt.gz':
print("Deleting Existing File!")
bucket.delete_objects(Delete={'Objects': [{'Key': 'kovid-testing/report.txt.gz'}]})
else:
print("File Doesn't exist")
return;
def upload_file_to_s3():
with open('report.log','a') as f:
s3 = boto3.resource('s3')
s3up = boto3.client('s3')
bucket = s3.Bucket('base-bucket')
objs = list(bucket.objects.filter(Prefix='kovid-testing/report.txt.gz'))
if len(objs) > 0 and objs[0].key == 'kovid-testing/report.txt.gz':
print("File Already Exists!")
else:
s3up.upload_file(report_export_zipped, 'base-bucket', "kovid-testing/report.txt.gz")
return;
def copy_data_to_redshift():
with open('report.log','a') as f:
postgres_user = "kovidrathee"
postgres_host = "my-redshift-cluster.2fhd8ghsdghs.ap-southeast-1.redshift.amazonaws.com"
postgres_port = "5439"
redshift_dbse = "segment"
truncate_rdft = "truncate table mydatabase.report;"
upload_t_rdft = "copy mydatabase.report from 's3://base-bucket/kovid-testing/report.txt.gz' credentials 'aws_access_key_id=AWS_ACCESS_KEY;aws_secret_access_key=AWS_SECRET_KEY' delimiter '|' gzip removequotes ESCAPE ACCEPTINVCHARS ACCEPTANYDATE;"
print(subprocess.Popen('psql -U' + postgres_user + ' -h ' + postgres_host + ' -p ' + postgres_port + ' ' + redshift_dbse + ' -c"' + truncate_rdft + '"', shell=True, stdout=f, stderr=f))
print(subprocess.Popen('psql -U' + postgres_user + ' -h ' + postgres_host + ' -p ' + postgres_port + ' ' + redshift_dbse + ' -c"' + upload_t_rdft + '"', shell=True, stdout=f, stderr=f))
def send_email_notification_api():
sg = sendgrid.SendGridAPIClient(apikey=os.environ.get('SENDGRID_API_KEY'))
data = {
"personalizations": [
{
"to": [
{
"email": "[email protected]"
}
],
"subject": "ETL Notifications"
}
],
"from": {
"email": "[email protected]"
},
"content": [
{
"type": "text/plain",
"value": "ETL Successful!"
}
]
}
response = sg.client.mail.send.post(request_body=data)
print(error_log())
print(transform_report_data())
print(export_report_data())
print(remove_residual_files())
print(upload_file_to_s3())
print(copy_data_to_redshift())
print(send_email_notification_api())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment