Last active
August 29, 2015 14:20
-
-
Save theY4Kman/583d46d05cfcda542a72 to your computer and use it in GitHub Desktop.
SQLAlchemy query transformations by GET params. An extensible solution.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """An extensible system for building a SQLAlchemy query based on GET params | |
| Design principles: | |
| - Database operations should be easy to identify | |
| - User input (GET params) should be loosely coupled from models | |
| - Not all operations are valid on all columns (icontains is invalid for FLOAT) | |
| Example: | |
| >>> query = Token.query.with_entities(Token.id) | |
| >>> trans = QueryTransformer([QueryColumn(Token.id)]) | |
| >>> print trans.transform(query, {'$order_by': '-id'}) | |
| SELECT token.id AS token_id | |
| FROM token ORDER BY token.id DESC | |
| >>> print trans.transform(query, {'id': '123'}) | |
| SELECT token.id AS token_id | |
| FROM token | |
| WHERE token.id = :id_1 | |
| >>> print trans.transform(query, {'id__ne': '123'}) | |
| SELECT token.id AS token_id | |
| FROM token | |
| WHERE token.id != :id_1 | |
| The door is left wide open for convenient invocations like this: | |
| @login_required | |
| def get_tickets(): | |
| return jsonify(tickets=transform(Ticket)) | |
| Which would call the following rough code: | |
| def transform(model, args=request.args): | |
| columns = [get_query_column(column) for column in get_columns(model)] | |
| transformer = QueryTransformer(columns) | |
| return transformer.transform(model.query, args) | |
| This module is at example state, and most database operations have not been | |
| implemented. | |
| """ | |
| class InvalidOperation(Exception): | |
| pass | |
| class UnknownOperation(NameError): | |
| pass | |
| class UnknownColumn(UnknownOperation): | |
| pass | |
| def extend_tuple(t, length, element=None): | |
| """Ensure a tuple has at least length elements""" | |
| diff = length - len(t) | |
| if diff > 0: | |
| t += (element,) * diff | |
| return t | |
| class BaseColumnImpl(object): | |
| """Perform operations on columns""" | |
| def _preprocess_value(self, transformer, query_column, query, op, value): | |
| return value | |
| def eq(self, transformer, column, query, value, **kwargs): | |
| return query.filter(column.column == value) | |
| def ne(self, transformer, column, query, value, **kwargs): | |
| return query.filter(column.column != value) | |
| class QueryColumn(object): | |
| """Describes a mapping from query param to column""" | |
| impl = BaseColumnImpl() | |
| def __init__(self, column, name=None): | |
| self.column = column | |
| # declarative_base model column descriptors have this `key` attr | |
| self.name = name or column.key | |
| def __str__(self): | |
| return self.name | |
| def get_default_operation(self, transformer, query, value, **kwargs): | |
| """Name of operation to perform by default (if col=val passed)""" | |
| return 'eq' | |
| def handle_operation(self, transformer, query, op, value, **kwargs): | |
| if op is None: | |
| op = self.get_default_operation(transformer, query, value, | |
| **kwargs) | |
| handler = getattr(self.impl, op, None) | |
| if not op.startswith('_') and handler and callable(handler): | |
| value = self.impl._preprocess_value(transformer, self, query, op, | |
| value) | |
| return handler(transformer, self, query, value, **kwargs) | |
| else: | |
| raise UnknownOperation( | |
| 'Cannot find handler for column %s operation %s', self, op) | |
| def can_order_by(self, direction=None): | |
| return True | |
| class BaseSpecialImpl(object): | |
| """Perform special transformations on a query""" | |
| def _preprocess_value(self, transformer, query, op, value): | |
| return value | |
| def order_by(self, transformer, query, value, **kwargs): | |
| direction = 'asc' | |
| if value.startswith('-'): | |
| value = value[1:] | |
| direction = 'desc' | |
| query_column = transformer.get_column(value) | |
| if not query_column.can_order_by(direction): | |
| raise InvalidOperation('Column %s cannot order by %s', | |
| query_column, direction) | |
| column = query_column.column | |
| clause = getattr(column, direction)() | |
| return query.order_by(clause) | |
| class QueryTransformer(object): | |
| """Translates GET params into a query for a mapping of columns""" | |
| special_impl = BaseSpecialImpl() | |
| def __init__(self, columns, ignore_unknown=True, special_prefix='$', | |
| op_separator='__'): | |
| self.columns = columns | |
| self._column_map = {c.name: c for c in self.columns} | |
| self.ignore_unknown = ignore_unknown | |
| self.special_prefix = special_prefix | |
| self.op_separator = op_separator | |
| def get_column(self, name): | |
| if name not in self._column_map: | |
| raise UnknownColumn(name) | |
| return self._column_map[name] | |
| def transform(self, query, params, **kwargs): | |
| for name, value in params.items(): | |
| original = '='.join((name, value)) # For error reporting | |
| name = self.sanitize_param_name(name) | |
| if name.startswith(self.special_prefix): | |
| special = name[len(self.special_prefix):] | |
| transformer = lambda: self.handle_special( | |
| query, original, special, value, **kwargs) | |
| else: | |
| col, op = self.split_column_param(name) | |
| transformer = lambda: self.handle_column( | |
| query, original, col, op, value, **kwargs) | |
| try: | |
| rv = transformer() | |
| except UnknownOperation: | |
| if not self.ignore_unknown: | |
| raise | |
| continue | |
| else: | |
| if rv is not None: | |
| query = rv | |
| return query | |
| def split_column_param(self, name): | |
| """Split a parameter name into (column, operation)""" | |
| _split_param = name.split(self.op_separator, 1) | |
| return extend_tuple(_split_param, 2) | |
| def handle_special(self, query, original, name, value, **kwargs): | |
| handler = getattr(self.special_impl, name, None) | |
| if handler and callable(handler): | |
| value = self.special_impl._preprocess_value(self, query, name, | |
| value) | |
| return handler(self, query, value, **kwargs) | |
| else: | |
| raise UnknownOperation( | |
| 'Cannot find handler for special operation %s, ' | |
| 'from parameter %r', name, original) | |
| def handle_column(self, query, original, col, op, value, **kwargs): | |
| query_column = self.get_column(col) | |
| return query_column.handle_operation(self, query, op, value, **kwargs) | |
| def sanitize_name(self, name): | |
| """Sanitize an operation/special name. For subclassing purposes.""" | |
| return name | |
| def sanitize_param_name(self, name): | |
| """Sanitize a GET param name. For subclassing purposes.""" | |
| return name |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment