Skip to content

Instantly share code, notes, and snippets.

@wware
Last active December 4, 2015 23:38
Show Gist options
  • Save wware/22574eec552a35abde9c to your computer and use it in GitHub Desktop.
Save wware/22574eec552a35abde9c to your computer and use it in GitHub Desktop.
Install this in /usr/local/lib/python2.7/site-packages or wherever. It's useful.
import logging
import pprint
import traceback
_sql = False
try:
import sqlalchemy
import sqlparse
_sql = True
except ImportError:
pass
if _sql:
class ReindentFilter(sqlparse.filters.ReindentFilter):
def _split_kwds(self, tlist):
"""
>>> sql = "SELECT a, b, c FROM foo JOIN bar ON foo.x = bar.y ORDER BY foo.z"
>>> print sqlparse.format(sql, reindent=True, indent_width=4)
SELECT a,
b,
c
FROM foo
JOIN bar ON foo.x = bar.y
ORDER BY foo.z
"""
split_words = ('FROM', 'STRAIGHT_JOIN$', 'JOIN$', 'AND', 'OR',
'GROUP', 'ORDER', 'UNION', 'VALUES',
'SET', 'BETWEEN', 'EXCEPT', 'HAVING', 'LIMIT',
'ELSE', 'WHEN', 'THEN')
def _next_token(i):
t = tlist.token_next_match(i, sqlparse.tokens.Keyword, split_words,
regex=True)
if t and t.value.upper() == 'BETWEEN':
t = _next_token(tlist.token_index(t) + 1)
if t and t.value.upper() == 'AND':
t = _next_token(tlist.token_index(t) + 1)
return t
idx = 0
token = _next_token(idx)
added = set()
while token:
prev = tlist.token_prev(tlist.token_index(token), False)
offset = 1
if prev and prev.is_whitespace() and prev not in added:
tlist.tokens.pop(tlist.token_index(prev))
offset += 1
uprev = unicode(prev)
if (prev and (uprev.endswith('\n') or uprev.endswith('\r'))):
nl = tlist.token_next(token)
else:
nl = self.nl()
added.add(nl)
tlist.insert_before(token, nl)
offset += 1
token = _next_token(tlist.token_index(nl) + offset)
sqlparse.filters.ReindentFilter = ReindentFilter
def get_logger(stream=None, cache=[]):
"""
>>> import sys
>>> logger = get_logger(stream=sys.stdout)
>>> def bottom(x):
... logger.stack(x)
>>> def middle(x):
... bottom(x)
>>> def top(x):
... middle(x)
>>> top(3)
<...
File "...", line 1, in <module>
top(3)
File "...", line 2, in top
middle(x)
File "...", line 2, in middle
bottom(x)
File "...", line 2, in bottom
logger.stack(x)
3
>>> logger.pprint({
... 'a': 'Now is the winter of our discontent',
... 'b': 'made glorious summer by this sun of York',
... 'c': 3
... })
<...
{'a': 'Now is the winter of our discontent',
'b': 'made glorious summer by this sun of York',
'c': 3}
"""
if cache:
return cache[0]
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
try:
ch = logging.StreamHandler(stream=stream)
except TypeError:
# Python 2.6
ch = logging.StreamHandler(strm=stream)
formatter = logging.Formatter('%(filename)s:%(lineno)s %(message)s')
ch.setFormatter(formatter)
logger.addHandler(ch)
def _pprint(thing, logger=logger):
if logger.isEnabledFor(logging.INFO):
logger._log(logging.INFO, '\n' + pprint.pformat(thing), [])
logger.pprint = _pprint
def _stack(msg=None, logger=logger):
if logger.isEnabledFor(logging.INFO):
stuff = traceback.format_stack()[:-1]
if msg is not None:
stuff.append(str(msg))
else:
stuff[-1] = stuff[-1].rstrip()
logger._log(logging.ERROR, '\n' + ''.join(stuff), [])
logger.stack = _stack
if _sql:
def _sql_log(query, params={}, logger=logger):
from sqlalchemy.orm.query import Query
from sqlalchemy.dialects import postgresql
if logger.isEnabledFor(logging.DEBUG):
if isinstance(query, Query):
comp = query.statement.compile(dialect=postgresql.dialect())
sql = sqlparse.format(str(comp), reindent=True, indent_width=4)
for k in params:
sql = sql.replace(k, str(params[k]))
logger._log(logging.INFO, sql, [])
else:
logger._log(logging.INFO, query, [])
logger._log(logging.INFO, params, [])
logger.sql = _sql_log
def _query(conn, q, params=(), logger=logger):
if logger.isEnabledFor(logging.INFO):
rows = []
try:
cur = conn.cursor()
cur.execute(q, params)
rows = cur.fetchall()
except:
logger.exception('')
logger._log(logging.INFO, '\n' + pprint.pformat(list(rows)), [])
logger.query = _query
cache.append(logger)
return logger
if __name__ == "__main__":
import doctest
import sys
doctest.testmod(verbose=('-v' in sys.argv[1:]), optionflags=doctest.ELLIPSIS)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment