It accepts a query
string as well as an is_sql
boolean flag. If is_sql
is set to True
, the query is executed as a SQL
query and the results are returned as a list of dictionaries. If is_sql
is False
, the query is executed as a NoSQL
query and the results are returned as a collection of dictionaries.
import snowflake.connector
import json
SNOWFLAKE_ACCOUNT = "your_account"
SNOWFLAKE_USER = "your_user"
SNOWFLAKE_PASSWORD = "your_password"
SNOWFLAKE_DATABASE = "your_database"
SNOWFLAKE_SCHEMA = "your_schema"
SNOWFLAKE_WAREHOUSE = "your_warehouse"
# SQL queries
SQL_QUERY_1 = "SELECT * FROM your_table WHERE some_column = 'some_value'"
SQL_QUERY_2 = "SELECT column_1, column_2 FROM your_table WHERE some_column = 'some_value'"
# NoSQL queries
NOSQL_QUERY_1 = "SELECT * FROM mongo.my_db.users"
NOSQL_QUERY_2 = "SELECT fullName FROM mongo.my_db.users"
def execute_query(query, is_sql=True):
if is_sql:
conn = snowflake.connector.connect(
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
account=SNOWFLAKE_ACCOUNT,
database=SNOWFLAKE_DATABASE,
schema=SNOWFLAKE_SCHEMA,
warehouse=SNOWFLAKE_WAREHOUSE
)
else:
conn = snowflake.connector.connect(
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
account=SNOWFLAKE_ACCOUNT
)
conn.cursor().execute(f"USE DATABASE {SNOWFLAKE_DATABASE}")
conn.cursor().execute(f"USE SCHEMA {SNOWFLAKE_SCHEMA}")
with conn.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
columns = [col[0] for col in cur.description]
results = [dict(zip(columns, row)) for row in rows]
json_results = json.dumps(results, indent=2)
return json_results
This is a set of utility functions for working with Snowflake using Python.
To use these utilities, you'll need to have the snowflake-connector-python
package installed.
pip install snowflake-connector-python
execute_query()
: The execute_query()
function can be used to execute SQL or NoSQL queries and return the results as a JSON string.
from snowflake_utils import execute_query, SQL_QUERY_1, NOSQL_QUERY_1
# Example usage of the execute_query() function
sql_results = execute_query(SQL_QUERY_1)
nosql_results = execute_query(NOSQL_QUERY_1, is_sql=False)
sql_results = execute_query(SQL_QUERY_1)
nosql_results = execute_query(NOSQL_QUERY_1, is_sql=False)
SQL queries can be stored as constants in the snowflake_utils.py module.
SQL_QUERY_1 = "SELECT * FROM your_table WHERE some_column = 'some_value'"
SQL_QUERY_2 = "SELECT column_1, column_2 FROM your_table WHERE some_column = 'some_value'"
NoSQL queries can also be stored as constants in the snowflake_utils.py module.
NOSQL_QUERY_1 = "SELECT * FROM mongo.my_db.users"
NOSQL_QUERY_2 = "SELECT fullName FROM mongo.my_db.users"
Consider a MongoDB database, which is a NoSQL database. You have a database named my_db
and a collection called users
in the database. Now, to retrieve all users from the users collection in the my_db database in MongoDB using Snowflake, you can use the following NoSQL query:
SELECT * FROM mongo.my_db.users;
Assuming you have already set up the MongoDB External Function in Snowflake and created the users
table in Snowflake to match the schema of the users collection in MongoDB, you can use the execute_query()
function we wrote earlier to execute this NoSQL query and return the results as JSON.