Skip to content

Instantly share code, notes, and snippets.

@theY4Kman
Last active August 29, 2015 14:20
Show Gist options
  • Select an option

  • Save theY4Kman/583d46d05cfcda542a72 to your computer and use it in GitHub Desktop.

Select an option

Save theY4Kman/583d46d05cfcda542a72 to your computer and use it in GitHub Desktop.
SQLAlchemy query transformations by GET params. An extensible solution.
"""An extensible system for building a SQLAlchemy query based on GET params
Design principles:
- Database operations should be easy to identify
- User input (GET params) should be loosely coupled from models
- Not all operations are valid on all columns (icontains is invalid for FLOAT)
Example:
>>> query = Token.query.with_entities(Token.id)
>>> trans = QueryTransformer([QueryColumn(Token.id)])
>>> print trans.transform(query, {'$order_by': '-id'})
SELECT token.id AS token_id
FROM token ORDER BY token.id DESC
>>> print trans.transform(query, {'id': '123'})
SELECT token.id AS token_id
FROM token
WHERE token.id = :id_1
>>> print trans.transform(query, {'id__ne': '123'})
SELECT token.id AS token_id
FROM token
WHERE token.id != :id_1
The door is left wide open for convenient invocations like this:
@login_required
def get_tickets():
return jsonify(tickets=transform(Ticket))
Which would call the following rough code:
def transform(model, args=request.args):
columns = [get_query_column(column) for column in get_columns(model)]
transformer = QueryTransformer(columns)
return transformer.transform(model.query, args)
This module is at example state, and most database operations have not been
implemented.
"""
class InvalidOperation(Exception):
pass
class UnknownOperation(NameError):
pass
class UnknownColumn(UnknownOperation):
pass
def extend_tuple(t, length, element=None):
"""Ensure a tuple has at least length elements"""
diff = length - len(t)
if diff > 0:
t += (element,) * diff
return t
class BaseColumnImpl(object):
"""Perform operations on columns"""
def _preprocess_value(self, transformer, query_column, query, op, value):
return value
def eq(self, transformer, column, query, value, **kwargs):
return query.filter(column.column == value)
def ne(self, transformer, column, query, value, **kwargs):
return query.filter(column.column != value)
class QueryColumn(object):
"""Describes a mapping from query param to column"""
impl = BaseColumnImpl()
def __init__(self, column, name=None):
self.column = column
# declarative_base model column descriptors have this `key` attr
self.name = name or column.key
def __str__(self):
return self.name
def get_default_operation(self, transformer, query, value, **kwargs):
"""Name of operation to perform by default (if col=val passed)"""
return 'eq'
def handle_operation(self, transformer, query, op, value, **kwargs):
if op is None:
op = self.get_default_operation(transformer, query, value,
**kwargs)
handler = getattr(self.impl, op, None)
if not op.startswith('_') and handler and callable(handler):
value = self.impl._preprocess_value(transformer, self, query, op,
value)
return handler(transformer, self, query, value, **kwargs)
else:
raise UnknownOperation(
'Cannot find handler for column %s operation %s', self, op)
def can_order_by(self, direction=None):
return True
class BaseSpecialImpl(object):
"""Perform special transformations on a query"""
def _preprocess_value(self, transformer, query, op, value):
return value
def order_by(self, transformer, query, value, **kwargs):
direction = 'asc'
if value.startswith('-'):
value = value[1:]
direction = 'desc'
query_column = transformer.get_column(value)
if not query_column.can_order_by(direction):
raise InvalidOperation('Column %s cannot order by %s',
query_column, direction)
column = query_column.column
clause = getattr(column, direction)()
return query.order_by(clause)
class QueryTransformer(object):
"""Translates GET params into a query for a mapping of columns"""
special_impl = BaseSpecialImpl()
def __init__(self, columns, ignore_unknown=True, special_prefix='$',
op_separator='__'):
self.columns = columns
self._column_map = {c.name: c for c in self.columns}
self.ignore_unknown = ignore_unknown
self.special_prefix = special_prefix
self.op_separator = op_separator
def get_column(self, name):
if name not in self._column_map:
raise UnknownColumn(name)
return self._column_map[name]
def transform(self, query, params, **kwargs):
for name, value in params.items():
original = '='.join((name, value)) # For error reporting
name = self.sanitize_param_name(name)
if name.startswith(self.special_prefix):
special = name[len(self.special_prefix):]
transformer = lambda: self.handle_special(
query, original, special, value, **kwargs)
else:
col, op = self.split_column_param(name)
transformer = lambda: self.handle_column(
query, original, col, op, value, **kwargs)
try:
rv = transformer()
except UnknownOperation:
if not self.ignore_unknown:
raise
continue
else:
if rv is not None:
query = rv
return query
def split_column_param(self, name):
"""Split a parameter name into (column, operation)"""
_split_param = name.split(self.op_separator, 1)
return extend_tuple(_split_param, 2)
def handle_special(self, query, original, name, value, **kwargs):
handler = getattr(self.special_impl, name, None)
if handler and callable(handler):
value = self.special_impl._preprocess_value(self, query, name,
value)
return handler(self, query, value, **kwargs)
else:
raise UnknownOperation(
'Cannot find handler for special operation %s, '
'from parameter %r', name, original)
def handle_column(self, query, original, col, op, value, **kwargs):
query_column = self.get_column(col)
return query_column.handle_operation(self, query, op, value, **kwargs)
def sanitize_name(self, name):
"""Sanitize an operation/special name. For subclassing purposes."""
return name
def sanitize_param_name(self, name):
"""Sanitize a GET param name. For subclassing purposes."""
return name
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment