Last active
September 5, 2020 23:39
-
-
Save evan-burke/a5b4bab180a8a26229ac2c67e663072d to your computer and use it in GitHub Desktop.
simple sqlalchemy wrapper
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import configparser | |
import sqlalchemy as sa | |
__sadb_version__ = "0.0.2" | |
# 2020-09-03: | |
# Jeremy Howard's 'fastsql' is quite a bit better designed than this, but it's also higher-level - uses more ORM stuff | |
# https://github.com/fastai/fastsql | |
# honestly this could probably be a full module... | |
# TODO: | |
# update docstring to reflect 0.0.2 changes | |
# escape special characters in the password if needed, e.g., https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls | |
# maybe add further convenience method for queries - in run_query, connect if not already connected | |
# CHANGELOG: | |
# 0.0.2 | |
# introducd versioning / version string | |
# added 'query' convenience method, basically just alias for run_query() | |
# support non-postgres db connections | |
# | |
# 0.0.1 | |
# initial release | |
class sadb: | |
""" SqlAlchemy db wrapper designed to easily run raw sql queries. | |
Lots of room for improvement here but it works so... eh. | |
You can probably do more advanced things with the self.engine object if you want. | |
# Config file should look like this: | |
[db] # default; a different section can be specified on instantiation | |
db_host = a.database.hostname.com | |
db_user = qwerty | |
db_pass = abcde | |
default_db_name = dbname | |
# optional SQLAlchemy dialect - https://docs.sqlalchemy.org/en/13/dialects/index.html | |
dialect = mysql+mysqlconnector # defaults to "postgres" if not specified | |
## Inputs: | |
configfile = name of configuration file with connection details | |
dbname = database name to connect to; leave empty for default from config. | |
configsection = refers to the [db] section header within the config file, can be used for storing multiple sets of creds | |
# note to self, maybe look at addl improvements from here: (not public) | |
# https://github.com/InboxAI/devops/blob/master/database/db_script.py | |
## Example usage: | |
from sadb import sadb | |
configfile = "/path/to/config" | |
dbinstance = sadb(configfile) | |
myq = "select now()" | |
with dbinstance as db: | |
res = db.run_query(myq) | |
print(res) | |
# > [{'now': datetime.datetime(2020, 1, 16, 23, 53, 18, 972500, tzinfo=psycopg2.tz.FixedOffsetTimezone(offset=0, name=None))}] | |
# alternatively: | |
db = dbinstance.connect() | |
# ... db.run_query(...) | |
db.close() | |
# parameterized: | |
paramq = "select customer_id, customer_name | |
from fake_customers | |
where customer_id = :id" | |
customer_id = 5 | |
with dbinstance as db: | |
res = db.run_query(paramq, params={"id":customer_id}) | |
print(res) | |
# > [{'customer_id': 5, 'customer_name': 'Average Products LLC'}] | |
""" | |
def __init__(self, configfile, dbname=None, configsection="db"): | |
self.configfile = configfile | |
self.dbname = dbname | |
self.config = configparser.ConfigParser() | |
self.configsection = configsection | |
_ = self.config.read(configfile) | |
def __enter__(self): | |
""" Sets up the db engine.""" | |
if self.dbname is None: | |
self.dbname = self.config[self.configsection]["default_db_name"] | |
# SA refers to this as "dialect+driver", e.g., mysql+mysqldb | |
# but we'll just call it dialect for simplicity | |
if 'dialect' in self.config[self.configsection]: | |
self.dialect = self.config[self.configsection]['dialect'] | |
else: | |
# default to postgres | |
self.dialect = "postgresql" | |
dsn = { | |
"db_host": self.config[self.configsection]["db_host"], | |
"db_user": self.config[self.configsection]["db_user"], | |
"db_pass": self.config[self.configsection]["db_pass"], | |
"db": self.dbname, | |
"dialect": self.dialect | |
} | |
for i in dsn: | |
if len(dsn[i]) < 1: | |
errmsg = "Error: DSN value for '" + i + "' is empty" | |
raise Exception(errmsg) | |
dsn_url = "{dialect}://{db_user}:{db_pass}@{db_host}/{db}" | |
self.engine = sa.create_engine(dsn_url.format(**dsn)) | |
return self | |
def __exit__(self, exc_type, exc_value, traceback): | |
self.engine.dispose() | |
def connect(self): | |
# convenience method for alternative to calling using 'with dbclass as db...' | |
# Make sure to manually disconnect if doing this. | |
return self.__enter__() | |
def disconnect(self): | |
# Convenience function | |
self.engine.dispose() | |
def close(self): | |
# Convenience function | |
self.engine.dispose() | |
def run_query(self, query, params=None): | |
# Query parameterization uses the named colon format: | |
# "AND users.name BETWEEN :x AND :y " ... | |
# Pass params as a dict of values matching these names. | |
if not isinstance(query, sa.sql.elements.TextClause): | |
# convert to correct class: | |
query = sa.sql.text(query) | |
### add self.engine check here & create if not set | |
if params: | |
results = self.engine.execute(query, params) | |
else: | |
results = self.engine.execute(query) | |
if results.returns_rows: | |
out = [dict(i) for i in results] | |
results.close() | |
return out | |
else: | |
results.close() | |
return None | |
# convenience alias | |
def query(self, *kwargs): | |
return self.run_query(*kwargs) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment