-
-
Save josepablog/1ce154a45dc20348b6718804ac8ad0a5 to your computer and use it in GitHub Desktop.
to_redshift.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gzip | |
from functools import wraps | |
import boto3 | |
from sqlalchemy import MetaData | |
from pandas import DataFrame | |
from pandas.io.sql import SQLTable, pandasSQL_builder | |
import psycopg2 | |
import codecs | |
import cStringIO | |
from io import BytesIO | |
def monkeypatch_method(cls): | |
""" | |
Creates a decoration for monkey-patching a class | |
Recipe from: https://mail.python.org/pipermail/python-dev/2008-January/076194.html | |
Args: | |
cls: | |
Returns: | |
""" | |
@wraps(cls) | |
def decorator(func): | |
setattr(cls, func.__name__, func) | |
return func | |
return decorator | |
def resolve_qualname(table_name, schema=None): | |
name = '.'.join([schema, table_name]) if schema is not None else table_name | |
return name | |
@monkeypatch_method(DataFrame) | |
def to_redshift(self, table_name, engine, s3_bucket, s3_keypath=None, | |
schema=None, if_exists='fail', index=True, | |
compress=True, aws_access_key_id=None, aws_secret_access_key=None, | |
null_as=None, emptyasnull=True): | |
""" | |
Inserts dataframe to Redshift by creating a file in S3 | |
Args: | |
self: Panda' dataframe to insert into Redshift | |
table_name: Name of the table to insert dataframe | |
engine: An SQL alchemy engine object | |
bucket: S3 bucket name | |
keypath: Keypath in s3 (without bucket name) | |
schema: Redshift schema | |
if_exists: {'fail', 'append', 'replace'} | |
index: bool; whether to include the DataFrame's index | |
compress: Compresses data before uploading it to S3 | |
aws_access_key_id: from ~./boto by default | |
aws_secret_access_key: from ~./boto by default | |
null_as: treat these values as null (not tested) | |
emptyasnull: bool; whether '' is inserted as null | |
Returns: | |
""" | |
url, aws_access_key_id, aws_secret_access_key = self.to_s3(bucket=s3_bucket, keypath=s3_keypath, index=index, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key, compress=compress) | |
qualname = resolve_qualname(table_name, schema) | |
table = SQLTable(table_name, pandasSQL_builder(engine, schema=schema), | |
self, if_exists=if_exists, index=index) | |
print("Creating table {}".format(qualname)) | |
if table.exists(): | |
if if_exists == 'fail': | |
raise ValueError("Table Exists") | |
elif if_exists == 'append': | |
queue = [] | |
elif if_exists == 'replace': | |
queue = ['drop table {};'.format(qualname), table.sql_schema() + ";"] | |
else: | |
raise ValueError("Bad option for `if_exists`") | |
else: | |
queue = [table.sql_schema() + ";"] | |
stmt = ("COPY {qualname}\n" | |
"FROM 's3://{keypath}' \n" | |
"CREDENTIALS 'aws_access_key_id={key};aws_secret_access_key={secret}' " | |
"{gzip} " | |
"{null_as} " | |
#"{emptyasnull}" | |
"CSV IGNOREHEADER 1;").format(qualname=qualname, | |
keypath=url, | |
key=aws_access_key_id, | |
secret=aws_secret_access_key, | |
gzip="GZIP " if compress else " ", | |
null_as="NULL AS '{}'".format(null_as) if null_as is not None else "", | |
emptyasnull="EMPTYASNULLL " if emptyasnull else " ") | |
queue.append(stmt) | |
print "Querying Redshift..." | |
with engine.begin() as con: | |
for stmt in queue: | |
print stmt | |
con.execute(stmt) | |
@monkeypatch_method(DataFrame) | |
def to_s3(self, bucket, keypath, index=True, compress=True, encoding="ascii", aws_access_key_id=None, aws_secret_access_key=None): | |
""" | |
Writes the data frame to S3 | |
Args: | |
self: Dataframe to upload | |
bucket: S3' bucket | |
keypath: S3's keypath | |
index: whether to include the index of the dataframe | |
compress: whether to compress the data before uploading it | |
encoding: Ascii by default | |
aws_access_key_id: from ~./boto by default | |
aws_secret_access_key: from ~./boto by default | |
Returns: The S3 URL of the file, and the credentials used to upload it | |
""" | |
print "Exporting to S3..." | |
# Figure out paths: | |
keypath = "{filename}.{ext}".format(filename=keypath, ext="gzip" if compress else "csv") | |
url = bucket + '/' + keypath | |
# Create S3 session | |
session = boto3.Session(aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key) | |
aws_access_key_id = session.get_credentials().access_key | |
aws_secret_access_key = session.get_credentials().secret_key | |
s3 = session.resource('s3') | |
obj = s3.Object(bucket_name=bucket, key=keypath) | |
# Create a memory file that allows unicode: | |
buffer = cStringIO.StringIO() | |
codecinfo = codecs.lookup("utf8") | |
fp = codecs.StreamReaderWriter(buffer, codecinfo.streamreader, codecinfo.streamwriter) | |
# Compress | |
gzfp = BytesIO() | |
self.to_csv(fp, index=index, encoding=encoding) | |
if compress: | |
print "Compressing" | |
fp.seek(0) | |
gzipped = gzip.GzipFile(fileobj=gzfp, mode='w') | |
gzipped.write(bytes(fp.read())) | |
gzipped.close() | |
gzfp.seek(0) | |
else: | |
gzfp = fp | |
gzfp.seek(0) | |
print("Uploading to {}".format(url)) | |
obj.upload_fileobj(gzfp) | |
return url, aws_access_key_id, aws_secret_access_key | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Summary
This code modifies the Pandas DataFrame class to allow to upload to RedShift via S3. The original code had errors that I fixed. I also refactored the code to make it more readable.
Usage