Created
October 14, 2010 21:27
-
-
Save rslinckx/627092 to your computer and use it in GitHub Desktop.
SQLAlchemy Query debug helper for Flask and Flask-SQLAlchemy
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import logging | |
log = logging.getLogger(__name__) | |
from werkzeug import Response, Template | |
import re | |
import cgi | |
import time | |
import uuid | |
from flask import request, Response, abort | |
from flaskext.sqlalchemy import get_debug_queries | |
try: | |
import sqlparse | |
except ImportError: | |
log.debug('Not using "sqlparse" for sql formatting') | |
sqlparse = None | |
try: | |
import pygments | |
from pygments import highlight as pygments_highlight | |
from pygments.lexers import SqlLexer | |
from pygments.formatters import HtmlFormatter | |
except ImportError: | |
log.debug('Not using "pygments" for sql highlighting') | |
pygments = False | |
BIND_MARKER_RE = re.compile(r'\?|%s|%\([^)]+\)s') | |
def counter(): | |
n = 0 | |
while True: | |
yield n | |
n += 1 | |
def mangle_sql(sql, params): | |
n = counter() | |
def replace_marker(marker): | |
ix = n.next() | |
try: | |
return unicode(params[ix]) | |
except (IndexError, KeyError): | |
return unicode(params['param_%d' % (ix+1)]) | |
return BIND_MARKER_RE.sub(replace_marker, sql) | |
def highlight(sql, params, mangle=True): | |
formatted_sql = sql | |
if mangle: | |
formatted_sql = mangle_sql(sql, params) | |
if sqlparse: | |
formatted_sql = sqlparse.format(formatted_sql, reindent=True) | |
if pygments: | |
formatted_sql = pygments_highlight(formatted_sql, SqlLexer(), HtmlFormatter(noclasses=True)) | |
else: | |
formatted_sql = '<div><pre>%s</pre></div>' % cgi.escape(formatted_sql) | |
return formatted_sql | |
QUERY_TPL = Template('''\ | |
<!DOCTYPE html> | |
<html> | |
<head> | |
<style type="text/css"> | |
h2 {} | |
#params, #sql, #execute, #explain table { | |
width: 100%; | |
border: 1px solid grey; | |
margin: 0; | |
padding: 0; | |
} | |
pre { | |
margin: 0; | |
padding: 5px; | |
} | |
table { | |
border-collapse: collapse | |
} | |
td, th { | |
padding: 0 0 0 5px; | |
text-align: left; | |
margin: 0; | |
} | |
th { | |
border: 1px solid grey; | |
} | |
</style> | |
<title>Query Stats: ${query.uid}</title> | |
</head> | |
<body> | |
<h1>Query Stats: ${query.uid}</h1> | |
<h2>Params</h2> | |
<div id="params"> | |
<pre>${repr(query.parameters)}</pre> | |
</div> | |
<h2>SQL</h2> | |
<div id="sql"> | |
${query.highlighted_statement} | |
</div> | |
<h2>Execution</h2> | |
<div id="execute"> | |
${execute[0]} ms, ${execute[1]} results | |
</div> | |
<h2>Explain</h2> | |
<div id="explain"> | |
<table> | |
<tr> | |
<% for k in explain[0] %> | |
<th>${k}</th> | |
<% endfor %> | |
</tr> | |
<% for row in explain[1] %> | |
<tr> | |
<% for v in row %> | |
<td><pre>${v}</pre></td> | |
<% endfor %> | |
</tr> | |
<% endfor %> | |
</table> | |
<% if is_pg and explain %> | |
<form method="post" target="_blank" action="http://explain.depesz.com/new"> | |
<textarea name="explain" style="display: none;"> | |
<% for row in explain[1] %>${row[0]} | |
<% endfor %> | |
</textarea> | |
<input type="submit" value="depesz pretty-print" name="submit"> | |
</form> | |
<% endif %> | |
</div> | |
</body> | |
</html> | |
''') | |
from flask import url_for | |
class QueryWrapper(object): | |
def __init__(self, q, engine): | |
self.uid = uuid.uuid4().hex | |
self.q = q | |
self.engine = engine | |
self.href = url_for('.__sadebug_query', uid=self.uid, _external=True) | |
self.real_statement = None | |
def __getattr__(self, name): | |
return getattr(self.q, name) | |
def explain(self): | |
if not self.statement.upper().startswith('SELECT'): | |
return None | |
if self.engine.dialect.name == 'sqlite': | |
result = self.engine.execute("EXPLAIN QUERY PLAN %s" % self.statement, self.parameters) | |
else: | |
result = self.engine.execute("EXPLAIN ANALYZE %s" % self.statement, self.parameters) | |
explain = result.keys(), result.fetchall() | |
result.close() | |
return explain | |
def execute(self): | |
if not self.statement.upper().startswith('SELECT'): | |
return None | |
a = time.time() | |
result = self.engine.execute(self.statement, self.parameters) | |
fetch = result.fetchall() | |
b = time.time() | |
result.close() | |
self.real_statement = getattr(result, 'query', None) | |
return (b-a)*1000, len(fetch) | |
@property | |
def highlighted_statement(self): | |
return highlight(self.real_statement or self.statement, self.parameters, mangle=self.real_statement is None) | |
def render(self): | |
return QUERY_TPL.render( | |
query=self, | |
explain = self.explain(), | |
execute = self.execute(), | |
is_pg = self.engine.dialect.name.startswith('postgres'), | |
) | |
def __repr__(self): | |
return 'Query(%s, %r)' % (self.href, self.statement[:30] + '[...]' + self.statement[-30:]) | |
class SaDebug(object): | |
def __init__(self): | |
self.queries = {} | |
def init_app(self, app, db): | |
self.app = app | |
self.db = db | |
if self.app.debug: | |
if app.config.get('SADEBUG_REPORT_QUERIES', False): | |
self.app.after_request(self.report_queries) | |
self.app.context_processor(self.inject_debug_queries) | |
self.app.add_url_rule('/__db/<uid>', endpoint='__sadebug_query', view_func=self.render_query) | |
def inject_debug_queries(self): | |
return dict(get_debug_queries=self.get_debug_queries) | |
def report_queries(self, response): | |
total = 0 | |
buf = ['SaDebug report:'] | |
for i, q in enumerate(self.get_debug_queries()): | |
buf.append(' %02d || %s[...]%s || %r || %dms' % (i, q.statement.replace('\n', '')[:50], q.statement.replace('\n', '')[-70:], q.parameters, q.duration*1000)) | |
buf.append(' -> %s' % q.href) | |
total += q.duration | |
buf.append(' ----> Total: %dms' % (total * 1000)) | |
log.info('\n'.join(buf)) | |
return response | |
def get_debug_queries(self): | |
for q in sorted(get_debug_queries(), key=lambda x: x.start_time): | |
q = QueryWrapper(q, self.db.engine) | |
self.queries[q.uid] = q | |
yield q | |
def render_query(self, uid): | |
query = self.queries.get(uid) | |
if not query: | |
abort(404) | |
return Response(query.render(), mimetype='text/html') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment