Skip to content

Instantly share code, notes, and snippets.

@bencharb
Last active October 13, 2015 11:04
Show Gist options
  • Select an option

  • Save bencharb/5e24700b1333ed614545 to your computer and use it in GitHub Desktop.

Select an option

Save bencharb/5e24700b1333ed614545 to your computer and use it in GitHub Desktop.
Mongo expression builder with Django style queries
# BUILD A MONGO QUERY WITH DJANGO QUERY STYLE
# THIS WORKS FOR BASIC QUERIES, BUT IS MISSING SEVERAL OPERATORS
#
# In [1083]:
# birds = ME(color__in=('yellow', 'orange',), name='Big Bird')
# feathers = ME(feathers__count__gt=1000)
# In [1084]:
# print birds.expression
# print feathers.expression
# {'$and': [{'color': {'$in': ('yellow', 'orange')}}, {'name': {'$eq': 'Big Bird'}}]}
# {'feathers.count': {'$gt': 1000}}
# In [1085]:
# birds_without_many_feathers = birds & feathers
# In [1086]:
# birds_without_many_feathers.expression
# Out[1086]:
# {u'$and': [{'color': {'$in': ('yellow', 'orange')}},
# {'name': {'$eq': 'Big Bird'}},
# {'feathers.count': {'$gt': 1000}}]}
class MongoOperators(object):
OPERATOR_MAP = {
'eq':'$eq',
'gt':'$gt',
'gte':'$gte',
'lt':'$lt',
'lte':'$lte',
'ne':'$ne',
'in':'$in',
'nin':'$nin',
'exists':'$exists',
'or':'$or',
'and':'$and',
'not':'$notr',
'nor':'$nor',
}
MODIFIERS = ('not','$not',)
COMPARERS = ('and', 'or', 'nor',
'$and', '$or', '$nor',)
DEFAULT_LOGICAL = '$and'
DEFAULT_OPERATOR = '$eq'
@classmethod
def get(cls, op):
if op.startswith('$'):
return op
return cls.OPERATOR_MAP[op]
class _MongoOperator(unicode):
@property
def is_comparer(self):
return self.normalized in MongoOperators.COMPARERS
@property
def normalized(self):
return self.normalize(self).next()
@property
def is_modifier(self):
return self.normalized in MongoOperators.MODIFIERS
@property
def can_flatten(self):
return not self.normalized.is_modifier
@classmethod
def normalize(cls, *ops):
for op in ops:
if op == '':
yield cls(MongoOperators.DEFAULT_OPERATOR)
else:
yield op
@classmethod
def eq(cls, op1, op2):
normal = cls.normalize(op1, op2)
return all(normal)
@classmethod
def can_merge(cls, *operators):
return all([cls.eq(operators[0], o) and cls(o).can_flatten for o in operators])
def MongoOperator(val):
if val is None:
return _MongoOperator()
return _MongoOperator(val)
class MongoExpression(object):
KEY_SEP = '__'
EXP_SEP = '.'
def __init__(self, path_seq, op):
super(MongoExpression, self).__init__()
self.path_seq = path_seq
self.op = MongoOperator(op)
@classmethod
def from_key(cls, key):
parts = key.split(cls.KEY_SEP)
if len(parts) > 1 and parts[-1] in MongoOperators.OPERATOR_MAP:
op = parts.pop()
return cls(parts, op)
return cls(parts, MongoOperators.DEFAULT_OPERATOR)
@property
def key(self):
seq = self.KEY_SEP.join(self.path_seq)
if self.op == MongoOperators.DEFAULT_OPERATOR:
return seq
return self.KEY_SEP.join(seq, self.op)
@property
def expression_maker(self):
exp_key = self.EXP_SEP.join(self.path_seq)
def decorator(val):
return {exp_key:{MongoOperators.get(self.op):val}}
return decorator
@classmethod
def create(cls, **querydict):
if len(querydict) > 1:
raise Exception('only one expression per object')
key, val = querydict.items()[0]
obj = cls.from_key(key)
maker = obj.expression_maker
return maker(val)
class MongoLogicalExpression(object):
def __init__(self, *mongo_expressions, **kwargs):
super(MongoLogicalExpression, self).__init__()
self.DEFAULT_LOGICAL = MongoOperator(MongoOperators.DEFAULT_LOGICAL)
self.logical_operator = MongoOperator(kwargs.pop('logical_operator', None))
self.mongo_expressions = list(mongo_expressions)
def operator_(self, other, op_value):
if _MongoOperator.can_merge(self.logical_operator, other.logical_operator, op_value):
exprs = list(self.mongo_expressions)
exprs.extend(list(other.mongo_expressions))
return self.__class__(*exprs, logical_operator=op_value)
return self.__class__(self._nested_expression, other._nested_expression, logical_operator=op_value)
def __and__(self, other):
if self == other:
return self
return self.operator_(other, '$and')
def __or__(self, other):
if self == other:
return self
return self.operator_(other, '$or')
def __invert__(self):
expr = self._single_expression or self.expression
return self.__class__(expr, logical_operator='$not')
@property
def expression(self):
single = self._single_expression
if single is not None:
return single
logical = self.logical_operator if self.logical_operator else self.DEFAULT_LOGICAL
return {logical:self.mongo_expressions}
@property
def _single_expression(self):
if len(self.mongo_expressions) != 1:
return
if self.logical_operator.is_modifier:
return {self.logical_operator:self.mongo_expressions[0]}
if self.logical_operator.is_comparer:
return self.mongo_expressions[0]
@property
def _nested_expression(self):
if self.logical_operator.can_flatten:
return self.mongo_expressions
return {self.logical_operator:self.mongo_expressions}
@staticmethod
def ME(*args, **querydict):
# In [1083]:
# birds = ME(color__in=('yellow', 'orange',), name='Big Bird')
# feathers = ME(feathers__count__gt=1000)
# In [1084]:
# print birds.expression
# print feathers.expression
# {'$and': [{'color': {'$in': ('yellow', 'orange')}}, {'name': {'$eq': 'Big Bird'}}]}
# {'feathers.count': {'$gt': 1000}}
# In [1085]:
# birds_without_many_feathers = birds & feathers
# In [1086]:
# birds_without_many_feathers.expression
# Out[1086]:
# {u'$and': [{'color': {'$in': ('yellow', 'orange')}},
# {'name': {'$eq': 'Big Bird'}},
# {'feathers.count': {'$gt': 1000}}]}
logical = args[0] if args else None
objs = []
for k, v in querydict.iteritems():
objs.append(MongoExpression.create(**{k:v}))
logical_exprs = MongoLogicalExpression(*objs, logical_operator=logical)
return logical_exprs
ME = MongoLogicalExpression.ME
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment