Created
May 8, 2025 17:39
-
-
Save mencargo/9ddbb5440a802761c7871a89e9dd190b to your computer and use it in GitHub Desktop.
Python Script for Snowflake to Snowflake Table Clone
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import snowflake.connector | |
import pandas as pd | |
from tqdm import tqdm | |
import os | |
# Get account with: SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME(); | |
# Source Snowflake account credentials | |
source_account = '' | |
source_user = '' | |
source_password = '' | |
source_role = '' | |
source_database = '' | |
source_schema = '' | |
# Destination Snowflake account credentials | |
destination_account = '' | |
destination_user = '' | |
destination_password = '' | |
destination_role = '' | |
destination_database = '' | |
destination_schema = '' | |
# List of tables to copy | |
tables = ['','','','','','','',''] | |
try: | |
# Connect to source Snowflake account | |
source_conn = snowflake.connector.connect( | |
account=source_account, | |
user=source_user, | |
password=source_password, | |
role=source_role, | |
database=source_database, | |
schema=source_schema | |
) | |
print("Connected to source Snowflake account successfully.") | |
except snowflake.connector.errors.Error as e: | |
print(f"Failed to connect to source Snowflake account: {e}") | |
exit(1) # Exit the script if connection fails | |
try: | |
# Connect to destination Snowflake account | |
destination_conn = snowflake.connector.connect( | |
account=destination_account, | |
user=destination_user, | |
password=destination_password, | |
role=destination_role, | |
database=destination_database, | |
schema=destination_schema | |
) | |
print("Connected to destination Snowflake account successfully.") | |
except snowflake.connector.errors.Error as e: | |
print(f"Failed to connect to destination Snowflake account: {e}") | |
exit(1) # Exit the script if connection fails | |
try: | |
for table in tables: | |
print(f"#### {table} ####") | |
# Temporary CSV file | |
csv_file = f'{table}.csv' | |
# Fetch data from source table | |
source_cursor = source_conn.cursor() | |
source_cursor.execute(f'SELECT * FROM {table}') | |
data = [] | |
columns = [desc[0] for desc in source_cursor.description] | |
for row in tqdm(source_cursor, desc=f"Fetching data", unit=" rows", unit_scale=True): | |
data.append(row) | |
# Write data to CSV file | |
print(f"Writing csv file") | |
df = pd.DataFrame(data, columns=columns) | |
df.to_csv(csv_file, index=False, header=False) | |
# Create and prepare the destination table | |
print(f"Create or replace table in destination") | |
destination_cursor = destination_conn.cursor() | |
create_table_query = f""" | |
CREATE OR REPLACE TABLE {table} ( | |
{', '.join([f'"{col}" STRING' for col in columns])} | |
) | |
""" | |
destination_cursor.execute(create_table_query) | |
# Stage the CSV file in Snowflake | |
stage_name = 'tmp_copy' | |
print(f"Uploading file to stage: {stage_name}") | |
destination_cursor.execute(f'CREATE OR REPLACE STAGE {stage_name}') | |
destination_cursor.execute(f'PUT file://{os.path.abspath(csv_file)} @{stage_name}') | |
# Copy data from the stage into the destination table | |
print(f"Copy data into table") | |
copy_into_query = f""" | |
COPY INTO {table} | |
FROM @{stage_name}/{os.path.basename(csv_file)} | |
FILE_FORMAT = (TYPE = CSV, FIELD_OPTIONALLY_ENCLOSED_BY = '"', ESCAPE_UNENCLOSED_FIELD = None) | |
""" | |
destination_cursor.execute(copy_into_query) | |
print(f"Data copied successfully!") | |
# Clean up the temporary CSV file | |
if os.path.exists(csv_file): | |
os.remove(csv_file) | |
except snowflake.connector.errors.Error as e: | |
print(f"An error occurred: {e}") | |
finally: | |
# Close all connections and cursors | |
if 'source_cursor' in locals(): | |
source_cursor.close() | |
if 'destination_cursor' in locals(): | |
destination_cursor.close() | |
if 'source_conn' in locals(): | |
source_conn.close() | |
if 'destination_conn' in locals(): | |
destination_conn.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment