Skip to content

Instantly share code, notes, and snippets.

@silberman
Created October 22, 2020 23:43
Show Gist options
  • Save silberman/e26b2b58154803683fe7c2894ea7bf86 to your computer and use it in GitHub Desktop.
Save silberman/e26b2b58154803683fe7c2894ea7bf86 to your computer and use it in GitHub Desktop.
"""
Python2 example script for loading data from a .csv file into your Mozart Snowflake instance.
To use, specify the 7 constants defined at the top:
SNOWFLAKE_ACCOUNT, SNOWFLAKE_USERNAME, SNOWFLAKE_PASSWORD, and SNOWFLAKE_DATABASE come from https://app.mozartdata.com/export/mode
TARGET_SCHEMA and TARGET_TABLE are where we'll be writing to
CSV_FILEPATH is the full path to the csv file we'll be uploading
We need to install 2 python packages (order matters due to a version conflict, so install idna first):
pip install --upgrade idna==2.8
pip install snowflake-connector-Python
Notes:
* The table we're COPYing into has to already exist. This can be done in Snowflake's console
or in code by modifying the example table definition and using the create_table() function here.
* This uses the "internal table stage": https://docs.snowflake.com/en/user-guide/data-load-local-file-system-create-stage.html#table-stages
"""
import snowflake.connector
# Connection info - from https://app.mozartdata.com/export/mode
SNOWFLAKE_ACCOUNT = "???"
SNOWFLAKE_USERNAME = "???"
SNOWFLAKE_PASSWORD = "???"
SNOWFLAKE_DATABASE = "???"
# Schema and table in which we'll put this data
TARGET_SCHEMA = "csvs"
TARGET_TABLE = "???"
# Full path to the csv file we'll be uploading
CSV_FILEPATH = "???"
def make_snowflake_connection():
conn = snowflake.connector.connect(
user=SNOWFLAKE_USERNAME,
password=SNOWFLAKE_PASSWORD,
account=SNOWFLAKE_ACCOUNT,
database=SNOWFLAKE_DATABASE
)
return conn
def create_table():
conn = make_snowflake_connection()
result = conn.cursor().execute("CREATE SCHEMA IF NOT EXISTS %s" % TARGET_SCHEMA).fetchall()
print result
result = conn.cursor().execute("CREATE TABLE IF NOT EXISTS %s.%s (user_id number, name varchar, anumber number)" % (TARGET_SCHEMA, TARGET_TABLE)).fetchall()
print result
def put_file_on_stage(filepath):
conn = make_snowflake_connection()
result = conn.cursor().execute("USE SCHEMA %s" % TARGET_SCHEMA).fetchall()
print result
to_execute = "put file://" + filepath + " @%" + TARGET_TABLE + " auto_compress=true"
result = conn.cursor().execute(to_execute).fetchall()
print result
result = conn.cursor().execute("copy into %s file_format = (type = csv)" % TARGET_TABLE).fetchall()
print result
if __name__ == "__main__":
# create_table()
put_file_on_stage(CSV_FILEPATH)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment