Last active
August 29, 2015 14:03
-
-
Save igr-santos/185543686a55c5855a6b to your computer and use it in GitHub Desktop.
Object used to support the construction and manipulation of a MongoDB aggregation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from pymongo import MongoClient | |
from datetime import datetime | |
class Cache(object): | |
def __init__(self, cached): | |
self.date = datetime.today() | |
self.cached = cached | |
def InQuery(*args): | |
return {'$in': list(args)} | |
def DateQuery(**kwargs): | |
query = {} | |
for key in kwargs.keys(): | |
query['${0}'.format(key)] = kwargs[key] | |
return query | |
class MongoManager(object): | |
""" | |
MongoManager is an object used to support the construction and | |
manipulation of a MongoDB aggregation, due to lack of documentation | |
and the difficulty of finding a package that makes this aggregation | |
simply | |
""" | |
__pipeline = [] | |
__history = [] | |
__collection = None | |
__enable_cache = None | |
def __init__(self, database, collection, cache=False): | |
db = MongoClient()[database] | |
self.__collection = db[collection] | |
self.__enable_cache = cache | |
def __cached(func): | |
""" | |
decorator that updates the cache in handling the pipeline | |
""" | |
def _wrapper(self, *args, **kwargs): | |
if self.__enable_cache: | |
self.__history.append(Cache(self.__pipeline)) | |
return func(self, *args, **kwargs) | |
return _wrapper | |
def __convert_fields(self, **fields): | |
new_fields = {} | |
for key in fields.keys(): | |
new_fields[key.replace('__', '.')] = fields[key] | |
return new_fields | |
@__cached | |
def project(self, **fields): | |
""" | |
project method to select the fields that will be displayed, | |
if any method is called after the project method, this | |
method will have available only the fields that were created | |
with project method. | |
""" | |
project = { | |
'$project': self.__convert_fields(**fields) | |
} | |
self.__pipeline.append(project) | |
return self | |
@__cached | |
def unwind(self, field): | |
""" | |
unwind method down one level from the current, useful document | |
when it will work with filters within a list of documents in | |
the model (One-to-Many Relationships with Embedded Documents) | |
""" | |
self.__pipeline.append({'$unwind': field}) | |
return self | |
@__cached | |
def match(self, **fields): | |
""" | |
match method filtering according to a query fields, after a | |
pipeline mounted new filters can be added using the method | |
filter(**kwargs) that class | |
""" | |
self.__pipeline.append( | |
{'$match': self.__convert_fields(**fields)} | |
) | |
return self | |
@__cached | |
def group(self, count=False, **fields): | |
""" | |
group method groups all the related document on key _id, a | |
requirement for running on MongoDB fields. | |
count parameter creates a counter grouped objects, very | |
useful to return statistical values | |
""" | |
group = {'$group': {'_id': self.__convert_fields(**fields)}} | |
if count: | |
group['$group']['count'] = {'$sum': 1} | |
self.__pipeline.append(group) | |
return self | |
@__cached | |
def sort(self, **fields): | |
""" | |
sort method orderna fields using 1 for decreasing order, and | |
-1 for ascending order | |
WARNING: | |
be careful when using sort method, it uses the pymongo package, | |
where it is not possible to sort by numbers, characters and dates | |
in the same hierarchy. | |
""" | |
self.__pipeline.append({'$sort': self.__convert_fields(**fields)}) | |
return self | |
@__cached | |
def filter(self, **kwargs): | |
""" | |
filter method adds the first object design fields that will be | |
used in the filter and add to the pipeline match the same fields | |
""" | |
fields = self.__convert_fields(**kwargs) | |
index = self.__get_index('$match') | |
for key in fields.keys(): | |
self.__pipeline[0]['$project'][key] = 1 | |
self.__pipeline[index]['$match'][key] = fields[key] | |
return self | |
def __get_index(self, field): | |
count = 0 | |
for p in self.__pipeline: | |
if p.get(field): | |
return count | |
count += 1 | |
def print_cached(self): | |
print self.__cache | |
def print_pipeline(self): | |
print self.__pipeline | |
def __iter__(self): | |
return iter(self.execute()['result']) | |
def __str__(self): | |
return str(self.__pipeline) | |
def execute(self): | |
return self.__collection.aggregate(self.__pipeline) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment