Created
October 17, 2018 05:40
-
-
Save dharamsk/ce66dc8ce211d3436034bec4a37e4be4 to your computer and use it in GitHub Desktop.
Redshift: Programmatically configure dist style and sort key of *existing* tables
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# I wrote this script to alter 700 redshift tables to diststyle all (and remove sort keys) | |
# but it is partially setup to specify any dist style and sort key on a table by table basis | |
# all that's needed is to modify the main() function to accept a dict with the table configs | |
# Redshift clusters with large node types will waste disk space and network bandwidth | |
# when small tables use EVEN or DISTKEY dist styles | |
# sort keys will double the minimum size of a table, also wasting space | |
# see here for more on minimum table size calculation: | |
# https://aws.amazon.com/premiumsupport/knowledge-center/redshift-cluster-storage-space/ | |
# KNOWN ISSUE: if table name is reserved (e.g. region) it will get double quoted and break this script | |
# KNOWN ISSUE: if there are views or table constraints that depend on the table, this will error | |
# admin table views were created from here: | |
# https://github.com/awslabs/amazon-redshift-utils/tree/master/src/AdminViews | |
# happy scripting, use at your own risk, etc etc | |
import sqlalchemy | |
import os | |
import time | |
OLD_TABLE_SUFFIX = '__autostyled_20181013' | |
CREATE_SQL = """ | |
select ddl from admin.v_generate_tbl_ddl v | |
where v.tablename = '{table}' | |
and v.schemaname = '{schema}' | |
; | |
""" | |
PERMISSION_SQL = """ | |
select ddl from admin.v_generate_user_grant_revoke_ddl | |
where objtype != 'Function' | |
and schemaname = '{schema}' | |
and ddltype = 'grant' | |
and objname = '{table}' | |
; | |
""" | |
OWNER_SQL = """ | |
select tableowner | |
from pg_tables | |
where tablename = '{table}' | |
and schemaname = '{schema}' | |
""" | |
NEW_OWNER_SQL = "ALTER TABLE {new_table_name} OWNER TO \"{owner}\";" | |
INSERT_SQL = """ | |
insert into {new_table_name} | |
select * from {full_table_name} | |
-- select distinct * if you want to dedupe while you're at it, | |
-- but that will mess with the validation sql results below | |
; | |
""" | |
RENAME_SQL = """ | |
BEGIN; | |
alter table {full_table_name} rename to {table}{suffix}; | |
alter table {new_table_name} rename to {table}; | |
END; | |
ANALYZE {full_table_name}; | |
""" | |
VALIDATION_SQL = """ | |
select b.new >= a.old | |
from (select count(*) as old from {full_table_name}{suffix}) a | |
, (select count(*) as new from {full_table_name}) b | |
; | |
""" | |
DROP_SQL = """ | |
drop table {old_table_with_suffix} | |
; | |
""" | |
CONN_URI = os.environ.get('YOUR_SPECIAL_CONN_URI_VARIABLE_NAME') | |
def get_engine(CONN_URI): | |
engine = sqlalchemy.create_engine(CONN_URI) | |
return engine.connect() | |
def get_prefix(): | |
return '{}{}'.format('n_', str(int(time.time()))) | |
def get_create_sql(table, schema, | |
diststyle, distkey, sortkey, | |
new_table_name): | |
# returns a sql command string to create the new configured table | |
# get original DDL | |
q = CREATE_SQL.format(table=table, schema=schema) | |
results = execute(q) | |
results = [str(t[0]) for t in results] | |
# make sure the result generally makes sense | |
if len(results) < 2: | |
print(results) | |
raise Exception('bad query results') | |
# strip off the old dist/sort options | |
i = results.index(')') | |
results = results[:i+1] | |
# add diststyle or distkey | |
if diststyle in ('ALL', 'EVEN'): | |
dist = 'DISTSTYLE {}'.format(diststyle) | |
elif distkey: | |
dist = 'DISTKEY ({})'.format(distkey) | |
results.append(dist) | |
# add sort key | |
if sortkey: | |
results.append('SORTKEY ({})'.format(sortkey)) | |
results.append(';') | |
# update table name in ddl | |
full_table_name = '{}.{}'.format(schema, table) | |
mapped_results = list(map(lambda x: x.replace( | |
full_table_name, new_table_name), results)) | |
if mapped_results == results: | |
raise Exception('the mapped results didnt change') | |
# return the string of the sql | |
return '\n'.join(mapped_results).replace('--DROP TABLE', 'DROP TABLE IF EXISTS') | |
def get_permissions_sql(table, | |
schema, | |
new_table_name): | |
q = PERMISSION_SQL.format(table=table, schema=schema) | |
results = execute(q) | |
list_results = [str(t[0]) for t in results] | |
full_table_name = '{}.{}'.format(schema, table) | |
mapped_list_results = list(map(lambda x: x.replace( | |
full_table_name, new_table_name), list_results)) | |
q2 = OWNER_SQL.format(schema=schema, table=table) | |
owner = execute(q2).fetchone()[0] | |
update_owner = NEW_OWNER_SQL.format( | |
new_table_name=new_table_name, | |
owner=owner) | |
mapped_list_results.insert(0, update_owner) | |
return '\n'.join(mapped_list_results) | |
def execute(command): | |
with get_engine(CONN_URI) as conn: | |
return conn.execute(command) | |
def alter_table_attributes( | |
table, | |
schema, | |
diststyle='ALL', | |
distkey=None, | |
sortkey=None): | |
new_table_prefix = get_prefix() | |
full_table_name = '{}.{}'.format(schema, table) | |
wont_work_table_name = '{}."{}"'.format(schema, table) | |
new_table_name = '{}.{}_{}'.format(schema, new_table_prefix, table) | |
# Get all the commands | |
create_command = get_create_sql( | |
table=table, | |
schema=schema, | |
diststyle=diststyle, | |
distkey=distkey, | |
sortkey=sortkey, | |
new_table_name=new_table_name) | |
permissions_command = get_permissions_sql( | |
table=table, | |
schema=schema, | |
new_table_name=new_table_name) | |
copy_command = INSERT_SQL.format(full_table_name=full_table_name, | |
new_table_name=new_table_name) | |
rename_command = RENAME_SQL.format(full_table_name=full_table_name, | |
new_table_name=new_table_name, | |
table=table, | |
suffix=OLD_TABLE_SUFFIX) | |
validation_command = VALIDATION_SQL.format(full_table_name=full_table_name, | |
suffix=OLD_TABLE_SUFFIX) | |
old_table_with_suffix = '{}{}'.format(full_table_name, OLD_TABLE_SUFFIX) | |
drop_command = DROP_SQL.format(old_table_with_suffix=old_table_with_suffix) | |
# print(create_command) | |
# print(permissions_command) | |
# print(copy_command) | |
# print(rename_command) | |
# print(validation_command) | |
# print(drop_command) | |
if wont_work_table_name in create_command or full_table_name in create_command: | |
print('-------------------- .............. >.>>>>>>>>>>>>>>>>>> SKIPPING TABLE AS TABLE NAME IS RESERVED ##################################') | |
print(create_command) | |
print(permissions_command) | |
print(copy_command) | |
print(rename_command) | |
print(validation_command) | |
print(drop_command) | |
return | |
print('Creating Table {}'.format(new_table_name)) | |
start_time = time.time() | |
execute(create_command) | |
compl_time = "--- %s seconds ---" % (time.time() - start_time) | |
print('Table Created in ' + compl_time) | |
print('Fixing Perms on table {}'.format(new_table_name)) | |
start_time = time.time() | |
try: | |
execute(permissions_command) | |
except: | |
print('EXCEPTION ----------------------------- >>>>>>>> create statement failed') | |
print(create_command) | |
compl_time = "--- %s seconds ---" % (time.time() - start_time) | |
print('Perms Fixed in ' + compl_time) | |
start_time = time.time() | |
try: | |
execute(copy_command) | |
compl_time = "--- %s seconds ---" % (time.time() - start_time) | |
print('Rows Inserted in ' + compl_time) | |
except: | |
execute('DROP TABLE IF EXISTS {}'.format(new_table_name)) | |
print('Insert failed, reverting and moving on..') | |
return | |
start_time = time.time() | |
execute(rename_command) | |
compl_time = "--- %s seconds ---" % (time.time() - start_time) | |
print('Tables Renamed in ' + compl_time) | |
valid = execute(validation_command).first()[0] | |
if not valid: | |
raise Exception('table row count mismatch') | |
else: | |
start_time = time.time() | |
execute(drop_command) | |
compl_time = "--- %s seconds ---" % (time.time() - start_time) | |
print('Old Table Dropped in ' + compl_time) | |
def main(current_list): | |
z = 0 | |
for t in current_list: | |
print('Table ' + str(z+1) + ': ' + t) | |
print(time.strftime("%Y-%m-%d %H:%M:%S")) | |
z += 1 | |
lstr = t.split('.') | |
if len(lstr) != 2: | |
print(lstr) | |
raise Exception('table name error') | |
schema = lstr[0] | |
table = lstr[1] | |
diststyle = 'ALL' # I only used this to remove sort keys and change diststyle to ALL | |
alter_table_attributes( | |
table, | |
schema, | |
diststyle=diststyle) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment