Skip to content

Instantly share code, notes, and snippets.

@mencargo
Created May 8, 2025 17:39
Show Gist options
  • Save mencargo/9ddbb5440a802761c7871a89e9dd190b to your computer and use it in GitHub Desktop.
Save mencargo/9ddbb5440a802761c7871a89e9dd190b to your computer and use it in GitHub Desktop.
Python Script for Snowflake to Snowflake Table Clone
import snowflake.connector
import pandas as pd
from tqdm import tqdm
import os
# Get account with: SELECT CURRENT_ORGANIZATION_NAME() || '-' || CURRENT_ACCOUNT_NAME();
# Source Snowflake account credentials
source_account = ''
source_user = ''
source_password = ''
source_role = ''
source_database = ''
source_schema = ''
# Destination Snowflake account credentials
destination_account = ''
destination_user = ''
destination_password = ''
destination_role = ''
destination_database = ''
destination_schema = ''
# List of tables to copy
tables = ['','','','','','','','']
try:
# Connect to source Snowflake account
source_conn = snowflake.connector.connect(
account=source_account,
user=source_user,
password=source_password,
role=source_role,
database=source_database,
schema=source_schema
)
print("Connected to source Snowflake account successfully.")
except snowflake.connector.errors.Error as e:
print(f"Failed to connect to source Snowflake account: {e}")
exit(1) # Exit the script if connection fails
try:
# Connect to destination Snowflake account
destination_conn = snowflake.connector.connect(
account=destination_account,
user=destination_user,
password=destination_password,
role=destination_role,
database=destination_database,
schema=destination_schema
)
print("Connected to destination Snowflake account successfully.")
except snowflake.connector.errors.Error as e:
print(f"Failed to connect to destination Snowflake account: {e}")
exit(1) # Exit the script if connection fails
try:
for table in tables:
print(f"#### {table} ####")
# Temporary CSV file
csv_file = f'{table}.csv'
# Fetch data from source table
source_cursor = source_conn.cursor()
source_cursor.execute(f'SELECT * FROM {table}')
data = []
columns = [desc[0] for desc in source_cursor.description]
for row in tqdm(source_cursor, desc=f"Fetching data", unit=" rows", unit_scale=True):
data.append(row)
# Write data to CSV file
print(f"Writing csv file")
df = pd.DataFrame(data, columns=columns)
df.to_csv(csv_file, index=False, header=False)
# Create and prepare the destination table
print(f"Create or replace table in destination")
destination_cursor = destination_conn.cursor()
create_table_query = f"""
CREATE OR REPLACE TABLE {table} (
{', '.join([f'"{col}" STRING' for col in columns])}
)
"""
destination_cursor.execute(create_table_query)
# Stage the CSV file in Snowflake
stage_name = 'tmp_copy'
print(f"Uploading file to stage: {stage_name}")
destination_cursor.execute(f'CREATE OR REPLACE STAGE {stage_name}')
destination_cursor.execute(f'PUT file://{os.path.abspath(csv_file)} @{stage_name}')
# Copy data from the stage into the destination table
print(f"Copy data into table")
copy_into_query = f"""
COPY INTO {table}
FROM @{stage_name}/{os.path.basename(csv_file)}
FILE_FORMAT = (TYPE = CSV, FIELD_OPTIONALLY_ENCLOSED_BY = '"', ESCAPE_UNENCLOSED_FIELD = None)
"""
destination_cursor.execute(copy_into_query)
print(f"Data copied successfully!")
# Clean up the temporary CSV file
if os.path.exists(csv_file):
os.remove(csv_file)
except snowflake.connector.errors.Error as e:
print(f"An error occurred: {e}")
finally:
# Close all connections and cursors
if 'source_cursor' in locals():
source_cursor.close()
if 'destination_cursor' in locals():
destination_cursor.close()
if 'source_conn' in locals():
source_conn.close()
if 'destination_conn' in locals():
destination_conn.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment