Last active
October 13, 2015 11:04
-
-
Save bencharb/5e24700b1333ed614545 to your computer and use it in GitHub Desktop.
Mongo expression builder with Django style queries
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # BUILD A MONGO QUERY WITH DJANGO QUERY STYLE | |
| # THIS WORKS FOR BASIC QUERIES, BUT IS MISSING SEVERAL OPERATORS | |
| # | |
| # In [1083]: | |
| # birds = ME(color__in=('yellow', 'orange',), name='Big Bird') | |
| # feathers = ME(feathers__count__gt=1000) | |
| # In [1084]: | |
| # print birds.expression | |
| # print feathers.expression | |
| # {'$and': [{'color': {'$in': ('yellow', 'orange')}}, {'name': {'$eq': 'Big Bird'}}]} | |
| # {'feathers.count': {'$gt': 1000}} | |
| # In [1085]: | |
| # birds_without_many_feathers = birds & feathers | |
| # In [1086]: | |
| # birds_without_many_feathers.expression | |
| # Out[1086]: | |
| # {u'$and': [{'color': {'$in': ('yellow', 'orange')}}, | |
| # {'name': {'$eq': 'Big Bird'}}, | |
| # {'feathers.count': {'$gt': 1000}}]} | |
| class MongoOperators(object): | |
| OPERATOR_MAP = { | |
| 'eq':'$eq', | |
| 'gt':'$gt', | |
| 'gte':'$gte', | |
| 'lt':'$lt', | |
| 'lte':'$lte', | |
| 'ne':'$ne', | |
| 'in':'$in', | |
| 'nin':'$nin', | |
| 'exists':'$exists', | |
| 'or':'$or', | |
| 'and':'$and', | |
| 'not':'$notr', | |
| 'nor':'$nor', | |
| } | |
| MODIFIERS = ('not','$not',) | |
| COMPARERS = ('and', 'or', 'nor', | |
| '$and', '$or', '$nor',) | |
| DEFAULT_LOGICAL = '$and' | |
| DEFAULT_OPERATOR = '$eq' | |
| @classmethod | |
| def get(cls, op): | |
| if op.startswith('$'): | |
| return op | |
| return cls.OPERATOR_MAP[op] | |
| class _MongoOperator(unicode): | |
| @property | |
| def is_comparer(self): | |
| return self.normalized in MongoOperators.COMPARERS | |
| @property | |
| def normalized(self): | |
| return self.normalize(self).next() | |
| @property | |
| def is_modifier(self): | |
| return self.normalized in MongoOperators.MODIFIERS | |
| @property | |
| def can_flatten(self): | |
| return not self.normalized.is_modifier | |
| @classmethod | |
| def normalize(cls, *ops): | |
| for op in ops: | |
| if op == '': | |
| yield cls(MongoOperators.DEFAULT_OPERATOR) | |
| else: | |
| yield op | |
| @classmethod | |
| def eq(cls, op1, op2): | |
| normal = cls.normalize(op1, op2) | |
| return all(normal) | |
| @classmethod | |
| def can_merge(cls, *operators): | |
| return all([cls.eq(operators[0], o) and cls(o).can_flatten for o in operators]) | |
| def MongoOperator(val): | |
| if val is None: | |
| return _MongoOperator() | |
| return _MongoOperator(val) | |
| class MongoExpression(object): | |
| KEY_SEP = '__' | |
| EXP_SEP = '.' | |
| def __init__(self, path_seq, op): | |
| super(MongoExpression, self).__init__() | |
| self.path_seq = path_seq | |
| self.op = MongoOperator(op) | |
| @classmethod | |
| def from_key(cls, key): | |
| parts = key.split(cls.KEY_SEP) | |
| if len(parts) > 1 and parts[-1] in MongoOperators.OPERATOR_MAP: | |
| op = parts.pop() | |
| return cls(parts, op) | |
| return cls(parts, MongoOperators.DEFAULT_OPERATOR) | |
| @property | |
| def key(self): | |
| seq = self.KEY_SEP.join(self.path_seq) | |
| if self.op == MongoOperators.DEFAULT_OPERATOR: | |
| return seq | |
| return self.KEY_SEP.join(seq, self.op) | |
| @property | |
| def expression_maker(self): | |
| exp_key = self.EXP_SEP.join(self.path_seq) | |
| def decorator(val): | |
| return {exp_key:{MongoOperators.get(self.op):val}} | |
| return decorator | |
| @classmethod | |
| def create(cls, **querydict): | |
| if len(querydict) > 1: | |
| raise Exception('only one expression per object') | |
| key, val = querydict.items()[0] | |
| obj = cls.from_key(key) | |
| maker = obj.expression_maker | |
| return maker(val) | |
| class MongoLogicalExpression(object): | |
| def __init__(self, *mongo_expressions, **kwargs): | |
| super(MongoLogicalExpression, self).__init__() | |
| self.DEFAULT_LOGICAL = MongoOperator(MongoOperators.DEFAULT_LOGICAL) | |
| self.logical_operator = MongoOperator(kwargs.pop('logical_operator', None)) | |
| self.mongo_expressions = list(mongo_expressions) | |
| def operator_(self, other, op_value): | |
| if _MongoOperator.can_merge(self.logical_operator, other.logical_operator, op_value): | |
| exprs = list(self.mongo_expressions) | |
| exprs.extend(list(other.mongo_expressions)) | |
| return self.__class__(*exprs, logical_operator=op_value) | |
| return self.__class__(self._nested_expression, other._nested_expression, logical_operator=op_value) | |
| def __and__(self, other): | |
| if self == other: | |
| return self | |
| return self.operator_(other, '$and') | |
| def __or__(self, other): | |
| if self == other: | |
| return self | |
| return self.operator_(other, '$or') | |
| def __invert__(self): | |
| expr = self._single_expression or self.expression | |
| return self.__class__(expr, logical_operator='$not') | |
| @property | |
| def expression(self): | |
| single = self._single_expression | |
| if single is not None: | |
| return single | |
| logical = self.logical_operator if self.logical_operator else self.DEFAULT_LOGICAL | |
| return {logical:self.mongo_expressions} | |
| @property | |
| def _single_expression(self): | |
| if len(self.mongo_expressions) != 1: | |
| return | |
| if self.logical_operator.is_modifier: | |
| return {self.logical_operator:self.mongo_expressions[0]} | |
| if self.logical_operator.is_comparer: | |
| return self.mongo_expressions[0] | |
| @property | |
| def _nested_expression(self): | |
| if self.logical_operator.can_flatten: | |
| return self.mongo_expressions | |
| return {self.logical_operator:self.mongo_expressions} | |
| @staticmethod | |
| def ME(*args, **querydict): | |
| # In [1083]: | |
| # birds = ME(color__in=('yellow', 'orange',), name='Big Bird') | |
| # feathers = ME(feathers__count__gt=1000) | |
| # In [1084]: | |
| # print birds.expression | |
| # print feathers.expression | |
| # {'$and': [{'color': {'$in': ('yellow', 'orange')}}, {'name': {'$eq': 'Big Bird'}}]} | |
| # {'feathers.count': {'$gt': 1000}} | |
| # In [1085]: | |
| # birds_without_many_feathers = birds & feathers | |
| # In [1086]: | |
| # birds_without_many_feathers.expression | |
| # Out[1086]: | |
| # {u'$and': [{'color': {'$in': ('yellow', 'orange')}}, | |
| # {'name': {'$eq': 'Big Bird'}}, | |
| # {'feathers.count': {'$gt': 1000}}]} | |
| logical = args[0] if args else None | |
| objs = [] | |
| for k, v in querydict.iteritems(): | |
| objs.append(MongoExpression.create(**{k:v})) | |
| logical_exprs = MongoLogicalExpression(*objs, logical_operator=logical) | |
| return logical_exprs | |
| ME = MongoLogicalExpression.ME | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment