Last active
July 11, 2023 17:41
-
-
Save mmautner/c102bff31d386679f021 to your computer and use it in GitHub Desktop.
Redshift Upserts w/ Python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import uuid | |
import psycopg2 | |
from secret import REDSHIFT_CREDS | |
from secret import AWS_ACCESS_KEY, AWS_SECRET_KEY | |
def get_primary_keys(tablename, db): | |
c = db.cursor() | |
sql = "select indexdef from pg_indexes where tablename = '%s';" % tablename | |
c.execute(sql) | |
result = c.fetchall()[0][0] | |
rfields = result.split('(')[1].strip(')').split(',') | |
fields = [field.strip().strip('"') for field in rfields] | |
return fields | |
def load_s3_file_into_redshift(s3path, tablename, delimiter='|'): | |
"""File must be gzipped: | |
s3path on Amazon S3 - (str) | |
tablename - (str) | |
field delimiter - (str)""" | |
temp_tablename = 'temp_%s' % uuid.uuid4().get_hex() | |
db = psycopg2.connect(**REDSHIFT_CREDS) | |
primary_keys = get_primary_keys(tablename, db) | |
equals_clause = '{dest}.%s = {src}.%s' | |
join_clause = ' AND '.join([equals_clause % (pk, pk) for pk in primary_keys]) | |
join_clause = join_clause.format(dest=tablename, src=temp_tablename) | |
upsert_qry = """\ | |
CREATE TEMPORARY TABLE {src} (LIKE {dest}); | |
COPY {src} FROM '{s3path}' | |
CREDENTIALS 'aws_access_key_id={access_key};aws_secret_access_key={secret_key}' | |
DELIMITER '{delimiter}' gzip; | |
BEGIN; | |
LOCK {dest}; | |
DELETE FROM {dest} USING {src} WHERE {join_clause}; | |
INSERT INTO {dest} SELECT * FROM {src}; | |
END; | |
""".format(dest=tablename, | |
src=temp_tablename, | |
s3path=s3path, | |
join_clause=join_clause, | |
access_key=AWS_ACCESS_KEY, | |
secret_key=AWS_SECRET_KEY, | |
delimiter=delimiter) | |
c = db.cursor() | |
c.execute(upsert_qry) | |
db.commit() | |
return s3path |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for this!