Created
November 12, 2020 05:00
-
-
Save iamaziz/69b1cbc14886be4695699b6f3bb392d6 to your computer and use it in GitHub Desktop.
Clone table from one redshift database "cluster" to another via S3 (in Python)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
"cells": [ | |
{ | |
"cell_type": "markdown", | |
"source": [ | |
"# Redshift DB: Cloning a table from one redshift database to another via S3\n", | |
"\n", | |
"The steps:\n", | |
"\n", | |
"1. unload the source table to S3\n", | |
"2. create the target table (NOTE: schema must match the source table)\n", | |
"3. copy the data files from S3 to the target table\n", | |
"\n", | |
"We will use Python to handle and run the steps.\n" | |
], | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"creds_src = {\n", | |
" 'host': '',\n", | |
" 'user': '',\n", | |
" 'password': '',\n", | |
" 'port': 5439,\n", | |
" 'dbname': '',\n", | |
"}\n", | |
"creds_dst = {\n", | |
" 'host': '',\n", | |
" 'user': '',\n", | |
" 'password': '',\n", | |
" 'port': 5439,\n", | |
" 'dbname': '',\n", | |
"}\n", | |
"\n", | |
"# -- source table\n", | |
"TABLE_SRC = 'users'\n", | |
"SCHEMA_SRC = 'public'\n", | |
"# -- destination table\n", | |
"TABLE_DST = 'users'\n", | |
"SCHEMA_DST = 'public'\n", | |
"\n", | |
"ACCESS_KEY_ID = ''\n", | |
"SECRET_ACCESS_KEY = ''\n", | |
"TARGET_S3 = ''" | |
], | |
"outputs": [], | |
"execution_count": 1, | |
"metadata": { | |
"execution": { | |
"iopub.status.busy": "2020-10-22T20:24:29.470Z", | |
"iopub.execute_input": "2020-10-22T20:24:29.496Z", | |
"iopub.status.idle": "2020-10-22T20:24:29.533Z", | |
"shell.execute_reply": "2020-10-22T20:24:29.557Z" | |
} | |
} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"import psycopg2\n", | |
"import pandas as pd\n", | |
"\n", | |
"conn_src = psycopg2.connect(**creds_src)\n", | |
"conn_dst = psycopg2.connect(**creds_dst)" | |
], | |
"outputs": [], | |
"execution_count": 2, | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# -- first, check the source table\n", | |
"pd.read_sql(f'select count(*) from {SCHEMA_SRC}.{TABLE_SRC};', con=conn_src)" | |
], | |
"outputs": [ | |
{ | |
"output_type": "execute_result", | |
"execution_count": 3, | |
"data": { | |
"text/plain": " count\n0 1731523", | |
"text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>count</th>\n </tr>\n </thead>\n <tbody>\n <tr>\n <th>0</th>\n <td>1731523</td>\n </tr>\n </tbody>\n</table>\n</div>" | |
}, | |
"metadata": {} | |
} | |
], | |
"execution_count": 3, | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"%%time\n", | |
"# -- unload the source table from the source database to S3\n", | |
"# see: https://docs.aws.amazon.com/redshift/latest/dg/t_Unloading_tables.html\n", | |
"unload = f\"\"\"\n", | |
"unload ('select * from {SCHEMA_SRC}.{TABLE_SRC}')\n", | |
"to '{TARGET_S3}'\n", | |
"access_key_id '{ACCESS_KEY_ID}'\n", | |
"secret_access_key '{SECRET_ACCESS_KEY}'\n", | |
"delimiter '\\t'\n", | |
"manifest\n", | |
"allowoverwrite;\n", | |
"\"\"\"\n", | |
"cur_src = conn_src.cursor()\n", | |
"cur_src.execute(unload)\n", | |
"cur_src.close()" | |
], | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"name": "stdout", | |
"text": [ | |
"CPU times: user 1.49 ms, sys: 2.07 ms, total: 3.55 ms\n", | |
"Wall time: 4.52 s\n" | |
] | |
} | |
], | |
"execution_count": 5, | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# --\n", | |
"# A quick hack for creating the target table with the proper schema based on the source table\n", | |
"# --\n", | |
"\n", | |
"# -- instead of creating new table with all the annoying handling of schema and data types\n", | |
"# we will read a tiny sample of the source table/database data,\n", | |
"sample = pd.read_sql(f'select * from {SCHEMA_SRC}.{TABLE_SRC} limit 3;', con=conn_src)\n", | |
"# then use pandas to upload the sample as a new table in the destination table/database\n", | |
"from sqlalchemy import create_engine\n", | |
"engine = create_engine(f\"postgresql://{creds_dst['user']}:{creds_dst['password']}@{creds_dst['host']}:5439/{creds_dst['dbname']}\")\n", | |
"sample.to_sql(TABLE_DST, schema=SCHEMA_DST, index=False, con=engine)\n", | |
"# after that lets truncate the destination table (to avoid duplicated data)\n", | |
"cur_dst = conn_dst.cursor()\n", | |
"cur_dst.execute(f'TRUNCATE {SCHEMA_DST}.{TABLE_DST};')" | |
], | |
"outputs": [], | |
"execution_count": 6, | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# -- sanity check\n", | |
"pd.read_sql(f'select * from {SCHEMA_DST}.{TABLE_DST}', con=conn_dst)" | |
], | |
"outputs": [ | |
{ | |
"output_type": "execute_result", | |
"execution_count": 7, | |
"data": { | |
"text/plain": "Empty DataFrame\nColumns: [user_id, created, updated, typ, username, first_name, last_name, mail_address, company, title, industry_id, job_level_id, social_signup, paid_expired, inside, temporary, cover]\nIndex: []", | |
"text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>user_id</th>\n <th>created</th>\n <th>updated</th>\n <th>typ</th>\n <th>username</th>\n <th>first_name</th>\n <th>last_name</th>\n <th>mail_address</th>\n <th>company</th>\n <th>title</th>\n <th>industry_id</th>\n <th>job_level_id</th>\n <th>social_signup</th>\n <th>paid_expired</th>\n <th>inside</th>\n <th>temporary</th>\n <th>cover</th>\n </tr>\n </thead>\n <tbody>\n </tbody>\n</table>\n</div>" | |
}, | |
"metadata": {} | |
} | |
], | |
"execution_count": 7, | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"%%time\n", | |
"# -- copy from s3 to the target table/database\n", | |
"# see: https://docs.aws.amazon.com/redshift/latest/dg/t_loading-tables-from-s3.html\n", | |
"copy = f\"\"\"\n", | |
"copy {SCHEMA_DST}.{TABLE_DST}\n", | |
"from '{TARGET_S3}'\n", | |
"access_key_id '{ACCESS_KEY_ID}'\n", | |
"secret_access_key '{SECRET_ACCESS_KEY}'\n", | |
"delimiter '\\t'\n", | |
"maxerror 10;\n", | |
"\"\"\"\n", | |
"cur_dst.execute(copy)\n", | |
"cur_dst.close()" | |
], | |
"outputs": [ | |
{ | |
"output_type": "stream", | |
"name": "stdout", | |
"text": [ | |
"CPU times: user 1.41 ms, sys: 1.65 ms, total: 3.06 ms\n", | |
"Wall time: 8.91 s\n" | |
] | |
} | |
], | |
"execution_count": 8, | |
"metadata": {} | |
}, | |
{ | |
"cell_type": "code", | |
"source": [ | |
"# -- check\n", | |
"pd.read_sql(f'select count(*) from {SCHEMA_DST}.{TABLE_DST}', con=conn_dst)" | |
], | |
"outputs": [ | |
{ | |
"output_type": "execute_result", | |
"execution_count": 9, | |
"data": { | |
"text/plain": " count\n0 1731522", | |
"text/html": "<div>\n<style scoped>\n .dataframe tbody tr th:only-of-type {\n vertical-align: middle;\n }\n\n .dataframe tbody tr th {\n vertical-align: top;\n }\n\n .dataframe thead th {\n text-align: right;\n }\n</style>\n<table border=\"1\" class=\"dataframe\">\n <thead>\n <tr style=\"text-align: right;\">\n <th></th>\n <th>count</th>\n </tr>\n </thead>\n <tbody>\n <tr>\n <th>0</th>\n <td>1731522</td>\n </tr>\n </tbody>\n</table>\n</div>" | |
}, | |
"metadata": {} | |
} | |
], | |
"execution_count": 9, | |
"metadata": {} | |
} | |
], | |
"metadata": { | |
"kernelspec": { | |
"display_name": "mtm_forecast", | |
"language": "python", | |
"name": "python3" | |
}, | |
"language_info": { | |
"name": "python", | |
"version": "3.8.5-final", | |
"mimetype": "text/x-python", | |
"codemirror_mode": { | |
"name": "ipython", | |
"version": 3 | |
}, | |
"pygments_lexer": "ipython3", | |
"nbconvert_exporter": "python", | |
"file_extension": ".py" | |
}, | |
"nteract": { | |
"version": "0.26.0" | |
} | |
}, | |
"nbformat": 4, | |
"nbformat_minor": 4 | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment