Last active
August 29, 2015 14:27
-
-
Save miraculixx/dffd8ef96d070bf83baa to your computer and use it in GitHub Desktop.
simple Django-like ORM for MySQL for use with Pandas
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
The MIT License (MIT) | |
Copyright (c) 2015 miraculixx at github.com | |
Permission is hereby granted, free of charge, to any person obtaining a copy | |
of this software and associated documentation files (the "Software"), to deal | |
in the Software without restriction, including without limitation the rights | |
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | |
copies of the Software, and to permit persons to whom the Software is | |
furnished to do so, subject to the following conditions: | |
The above copyright notice and this permission notice shall be included in | |
all copies or substantial portions of the Software. | |
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | |
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | |
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | |
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | |
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN | |
THE SOFTWARE. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
simple Django-like ORM for MySQL for use with Pandas | |
(c) 2015 miraculixx at github.com | |
Purpose | |
------- | |
Access MySQL tables as if they were Python objects. In principle | |
works like the Django ORM, however it only supports the very basics: | |
* filter and exclude by kwargs | |
* Q objects | |
* no joins | |
* no aggregates | |
And most importantly, directly transform a query to a Pandas dataframe like so: | |
sql.objects('auth_user').filter(username='admin').as_dataframe() | |
Mini Tutorial | |
------------- | |
# setup | |
import pandas as pd | |
from miniorm import SQL | |
sql = SQL('mysql://admin:1234@localhost/mydb') | |
# get all tables in database as a dataframe | |
tables = sql.tables.as_dataframe() | |
# get a specific table and filter for certain objects | |
sql.objects('auth_user').filter(username='admin').as_dataframe() | |
# get rows in table | |
rows = sql.objects('auth_user').execute().rows() | |
=> dict of rows [{ 'col1' : val1 }, ... ] | |
# or process Resultset row by row, implies execute() | |
for row in sql.objects('auth_user'): | |
print row | |
# as in Django, you can also create Q objects | |
q1 = Q(foo='bar', baz=5) | |
q2 = Q(foz='baz') | |
rows = sql.objects('auth_user').filter(q1 | q2) | |
Why? | |
---- | |
* I am familiar with the Django ORM and how to use it with Pandas. | |
* I didn't want to learn yet another syntax using either pd.read_sql, MySQLDb or SQLAlchemy. | |
* It's fun to write your own ORM once in a while | |
Licence | |
------- | |
MIT license. Detalis see LICENSE file | |
""" | |
import MySQLdb as db | |
import re | |
import copy | |
class Q(object): | |
def __init__(self, *args, **kwargs): | |
self._conditions = kwargs | |
self.qlist = [('', self)] | |
# should we return ~(conditions) | |
self._inv = False | |
def __repr__(self): | |
return 'Q: %s' % self.as_sql() | |
def negate(self): | |
self._inv = True | |
return self | |
def __and__(self, other): | |
andq = Q() | |
q = copy.deepcopy(self) | |
q.qlist.append(('AND', other)) | |
andq.qlist.append(('', q)) | |
return q | |
def __or__(self, other): | |
q = copy.deepcopy(self) | |
q.qlist.append(('OR', other)) | |
return q | |
def __invert__(self): | |
""" | |
return an inverted version of this object | |
""" | |
notq = Q().negate() | |
q = copy.deepcopy(self) | |
notq.qlist.append(('', q)) | |
return notq | |
def as_sql(self): | |
r = [] | |
for op, q in self.qlist: | |
if q == self: | |
r.append(self.build_conditions()) | |
else: | |
r.append('%s %s' % (op, q.as_sql())) | |
sql = (' '.join(r)).strip() | |
return ('(%s)' % sql) if sql else '' | |
def build_conditions(self): | |
""" | |
transform queryset into actual sql | |
""" | |
cond = [] | |
def add_cond(k, v, pattern): | |
cond.append(pattern.format(k=k, v=v)) | |
for k, v in self._conditions.iteritems(): | |
if '__' in k: | |
k, op = k.split('__') | |
else: | |
op = 'eq' | |
if not str(v).isdigit(): | |
val = '"%s"' % v | |
else: | |
val = v | |
# standard logical operators | |
if op == 'eq': | |
add_cond(k, val, '{k}={v}') | |
elif op == 'lt': | |
add_cond(k, val, '{k}<{v}') | |
elif op == 'lte': | |
add_cond(k, val, '{k}<={v}') | |
elif op == 'gt': | |
add_cond(k, val, '{k}>{v}') | |
elif op == 'gte': | |
add_cond(k, val, '{k}>={v}') | |
elif op == 'ne': | |
add_cond(k, val, '{k}<>{v}') | |
elif op == 'not': | |
add_cond(k, val, 'is not {k}') | |
elif op in ['in', 'isin']: | |
add_cond(k, val.join(','), '{k} in ({v})') | |
elif op == 'contains': | |
add_cond(k, v, '{k} like "%{v}%"') | |
sql = (' AND '.join(cond)).strip() | |
return '%s%s' % ('NOT' if self._inv else '', '(%s)' % sql if (sql and len(self.qlist) > 1) else sql) | |
class Resultset(list): | |
def __init__(self, cursor): | |
self._cur = cursor | |
@property | |
def cursor(self): | |
return self._cur | |
@property | |
def columns(self): | |
return zip(*self._cur.description)[0] | |
@property | |
def rows(self): | |
return list(self) | |
@property | |
def count(self): | |
return len(list(self)) | |
def __iter__(self): | |
self.extend(self._cur.fetchall()) | |
self._n = 0 | |
return self | |
def _row_to_dict(self, row): | |
return { k : v for k,v in zip(self.columns, row) } | |
def next(self): | |
if self._n < len(self): | |
row = self[self._n] | |
self._n += 1 | |
return self._row_to_dict(row) | |
raise StopIteration | |
class Queryset(object): | |
def __init__(self, sql=None, table=None, columns=None): | |
self._q = None | |
self._sql = sql | |
self._table = table or '<missing table>' | |
self._columns = columns or '*' | |
def __repr__(self): | |
return 'QS: %s' % self.as_sql() | |
def __iter__(self): | |
return self.execute().__iter__() | |
def all(self): | |
return self | |
def filter(self, *args, **kwargs): | |
if args and args[0]: | |
q = args[0] | |
else: | |
q = Q(**kwargs) | |
if self._q: | |
self._q &= q | |
else: | |
self._q = q | |
return self | |
def exclude(self, *args, **kwargs): | |
if args and args[0]: | |
q = args[0] | |
else: | |
q = Q(**kwargs) | |
if self._q: | |
self._q &= ~q | |
else: | |
self._q = ~q | |
return self | |
def as_sql(self, table=None, columns=None): | |
table = table or self._table | |
columns = columns or self._columns | |
where = self._q.as_sql() if self._q else '1' | |
return sql._prepare_sql(table, columns, where) | |
def execute(self, sql=None, table=None, columns=None): | |
sql = sql or self._sql | |
table = table or self._table | |
columns = self._columns or columns | |
assert sql, "need an SQL connection to perform this query" | |
return Resultset(sql._execute_sql(self.as_sql())) | |
def as_dataframe(self): | |
import pandas as pd | |
results = self.execute() | |
data = list(results) if results.count else None | |
return pd.DataFrame(data, columns=list(results.columns)) | |
class SQL(object): | |
def __init__(self, connstr): | |
self.connstr = connstr | |
self.conn = None | |
def connect(self): | |
if not self.conn: | |
self.conn = db.connect(**self.dbparams) | |
return self.conn | |
@property | |
def dbparams(self): | |
match = re.match(r'^mysql://(?P<user>.*):(?P<passwd>.*)@(?P<host>.*)/(?P<db>.*)', self.connstr) | |
return match.groupdict() | |
def objects(self, table): | |
return Queryset(sql=self, table=table) | |
@property | |
def tables(self): | |
db = self.dbparams['db'] | |
return Queryset(sql=self, table='information_schema.tables').filter(table_schema=db) | |
def _prepare_sql(self, table, columns, where): | |
opts=dict(table=table, columns=columns, where=where) | |
sql = """ | |
select {columns} | |
from {table} | |
where {where} | |
""".format(**opts) | |
return sql | |
def _execute_sql(self, sql): | |
#print "execute sql %s" % sql | |
conn = self.connect() | |
cur = conn.cursor() | |
cur.execute(sql) | |
return cur | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
q1 = Q(foo='bar', baz=5) | |
q2 = Q(foz='baz') | |
q3 = Q(baz='foo') | |
nq1 = ~q1 | |
print "q1", q1 | |
print "q1 & q2", q1 & q2 | |
print "q1 | q2", q1 | q2 | |
print "(q1 | q2) & q3", (q1 | q2) & q3 | |
print "nq1", nq1 | |
print "nq1 & q1", nq1 & q1 | |
print "((q1 | q2) & q3)", ((q1 | q2) & q3) | |
print "(~q1 | q2)", (~q1 | q2) | |
print "(~(q1 | q2))", (~(q1 | q2)) | |
print "(~(q1 | q2)) | q3", (~(q1 | q2)) | q3 | |
qs = Queryset() | |
qs.filter(q1) | |
print qs | |
qs.filter(q2) | |
print qs | |
qs.exclude(q2 | q3) | |
print qs | |
import pandas as pd | |
sql = SQL('mysql://admin:1234@localhost/shrebo') | |
print sql.tables.filter(table_name__contains='auth_group') | |
pd.DataFrame(list(sql.tables.filter(table_name='auth_group'))) | |
sql.objects('auth_user').filter(username='admin').as_dataframe() | |
sql.objects('auth_user').execute().rows | |
for row in sql.objects('auth_user'): | |
print row |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment