Last active
November 22, 2019 16:26
-
-
Save liquidgenius/7a9be27dead40c42fe565fbaa330c7d5 to your computer and use it in GitHub Desktop.
A minimal Python class for uniform API to both Snowflake and MySQL databases.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import pymysql | |
import pandas as pd | |
from snowflake.sqlalchemy import URL as SFURL | |
from sqlalchemy import create_engine | |
from sqlalchemy.engine.url import URL | |
class OmniDB: | |
""" A minimal Python class for uniform API to both Snowflake and MySQL databases. | |
TODO: Error handling | |
TODO: Logging""" | |
def __init__(self, mysql_settings, sf_settings, persist=True): | |
""" Both mysql_settings and sf_settings are passed into the class as dictionaries. | |
:param mysql_settings: dict: A dictionary of mysql settings. | |
:param sf_settings: dict: A dictionary of snowflake settings. | |
:param persist: bool: A flag that ensures queries use a persistent connection; default to True. | |
""" | |
self.mysql_settings = mysql_settings | |
self.sf_settings = sf_settings | |
self.mysql_connection = None | |
self.sf_connection = None | |
if persist: | |
self._omnidb_open_connections() | |
def _omnidb_open_connections(self): | |
""" Opens connections to both Snowflake and MySQL. | |
:return: None: Registers connections with the instance. | |
""" | |
# open connections | |
self.mysql_connection = self._omnidb_mysql_connection(**self.mysql_settings) | |
self.sf_connection = self._omnidb_sf_connection(**self.sf_settings) | |
return None | |
@staticmethod | |
def _omnidb_mysql_connection(host=None, port=None, user=None, password=None, database=None): | |
""" Provision a connection to the MySQL database. Explode self.mysql_settings to populate parameters. | |
:param host: str: The ip of the host server. | |
:param port: int: The port of the host server. | |
:param user: str: The username credential. | |
:param password: str: The password credential. | |
:param database: str: The name of the MySQL database. | |
:return: pymysql.Connection: A valid connection to the MySQL database | |
""" | |
result = create_engine(URL( | |
drivername='mysql+pymysql', | |
host=host, | |
port=port, | |
username=user, | |
password=password, | |
database=database | |
)) | |
return result | |
@staticmethod | |
def _omnidb_sf_connection(account=None, user=None, password=None, database=None, schema=None, | |
warehouse=None, role=None): | |
""" Provision a connection to the MySQL database. Explode self.mysql_settings to populate parameters. | |
:param account: str: The Snowflake account id. | |
:param user: str: The username credential. | |
:param password: str: The password credential. | |
:param database: str: The database name. | |
:param schema: str: The database schema. | |
:param warehouse: str: The warehouse name. | |
:param role: str: The role name. | |
:return: Connection: A valid connection to the Snowflake database | |
""" | |
result = create_engine(SFURL( | |
account=account, | |
user=user, | |
password=password, | |
database=database, | |
schema=schema, | |
warehouse=warehouse, | |
role=role, | |
)) | |
return result | |
def omnidb_execute_query(self, database, query, connection_type='persistent'): | |
""" A uniform API for execution of queries against MySQL or Snowflake databases. | |
:param database: str: The type of database connection. Must be 'mysql' or 'sf'. | |
:param query: str: The SQL query to execute. | |
:param connection_type: Utilize a 'single' or 'persistent' database query, defaults to 'persistent' to maintain | |
the session. | |
:return: pd.DataFrame: The query result formatted as a Pandas DataFrame. | |
""" | |
# assertions | |
assert database == 'mysql' or database == 'sf', "Please provide 'sf' or 'mysql' as the value for database." | |
assert connection_type == 'persistent' or connection_type == 'single', 'Please provide a valid ' \ | |
'connection_type; persistent or single.' | |
# create a mysql connection | |
if database == 'mysql': | |
if connection_type == 'single': | |
connection = self._omnidb_mysql_connection(**self.mysql_settings) | |
if connection_type == 'persistent': | |
connection = self.mysql_connection | |
# create a snowflake connection | |
if database == 'sf': | |
if connection_type == 'single': | |
connection = self._omnidb_sf_connection(**self.sf_settings) | |
if connection_type == 'persistent': | |
connection = self.sf_connection | |
# Execute the query | |
result = pd.read_sql_query(query, connection) | |
# Close the single connection | |
if connection_type == 'single': | |
connection.close() | |
connection.dispose() | |
# Print the results | |
self.omnidb_print_result(query, result) | |
return result | |
@staticmethod | |
def omnidb_print_result(query, result): | |
""" Prints the query and results to the console. | |
:param query: str: The query that was executed. | |
:param result: Pandas.DataFrame: The result of the query. | |
:return: None | |
""" | |
print(f'QUERY: {query}') | |
if isinstance(result, pd.DataFrame): | |
if 'status' in result.columns: | |
print(f"RESULT: {len(result)} records found\n{result.loc[0, 'status']}\n") | |
else: | |
print(f"RESULT:\n {result.to_string(justify='left', index=False)}\n") | |
else: | |
print(f'RESULT: 0 records found\n{result}\n') | |
return None |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Snowflake Usage
result = self.omnidb_execute_query('sf', query)
MySQL Usage
result = self.omnidb_execute_query('mysql', query)