import pymongo, json
def db_connection(db_name, collection_name):
client = pymongo.MongoClient('mongodb://localhost:27017/')
db = client[db_name]
collection = db[collection_name]
print('db {} connected, collection {} current data: {}'.
format(db_name, collection_name, collection.count()))
return collection
def db_tmp():
db_name = "_tmp_"
collection_name = "_tmp_"
_temp_ = db_connection(db_name, collection_name)
_temp_.drop()
return _temp_
tmp = db_tmp()
# json file data: http://media.mongodb.org/zips.json?_ga=1.172262627.1124773036.1490323698
with open("zips.json") as fp:
json_lines = fp.readlines()
fp.close()
for json_line in json_lines:
tmp.insert_one(json.loads(json_line))
db _tmp_ connected, collection _tmp_ current data: 0
tmp.find_one()
{'_id': '01001',
'city': 'AGAWAM',
'loc': [-72.622739, 42.070206],
'pop': 15338,
'state': 'MA'}
All of the following examples use the aggregate()
helper in the mongo shell.
The aggregate()
method uses the aggregation pipeline to processes documents into aggregated results. An aggregation pipeline consists of stages with each stage processing the documents as they pass along the pipeline. Documents pass through the stages in sequence.
The aggregate() method in the mongo shell provides a wrapper around the aggregate database command. See the documentation for your driver for a more idiomatic interface for data aggregation operations.
# Return States with Populations above 10 Million
pipeline = [
{ "$group": { "_id": "$state", "totalPop": { "$sum": "$pop" } } },
{ "$match": { "totalPop": { "$gte": 10*1e6 } } }
]
cursor = tmp.aggregate(pipeline)
list(cursor)
[{'_id': 'IL', 'totalPop': 11427576},
{'_id': 'OH', 'totalPop': 10846517},
{'_id': 'FL', 'totalPop': 12686644},
{'_id': 'NY', 'totalPop': 17990402},
{'_id': 'PA', 'totalPop': 11881643},
{'_id': 'TX', 'totalPop': 16984601},
{'_id': 'CA', 'totalPop': 29754890}]
The equivalent SQL for this aggregation operation is:
SELECT state, SUM(pop) AS totalPop
FROM zipcodes
GROUP BY state
HAVING totalPop >= (10*1000*1000)
# Return Average City Population by State
pipeline = [
{ "$group": {"_id": {"state": "$state", "city": "$city" }, "city_pop": { "$sum": "$pop" } } },
{ "$group": { "_id": "$_id.state", "avgCityPop": { "$avg": "$city_pop" } } },
{ "$sort": {"avgCityPop": -1}},
{ "$limit": 10}
]
cursor = tmp.aggregate(pipeline)
list(cursor)
[{'_id': 'DC', 'avgCityPop': 303450.0},
{'_id': 'CA', 'avgCityPop': 27756.42723880597},
{'_id': 'FL', 'avgCityPop': 27400.958963282937},
{'_id': 'AZ', 'avgCityPop': 20591.16853932584},
{'_id': 'RI', 'avgCityPop': 19292.653846153848},
{'_id': 'NV', 'avgCityPop': 18209.590909090908},
{'_id': 'HI', 'avgCityPop': 15831.842857142858},
{'_id': 'NJ', 'avgCityPop': 15775.89387755102},
{'_id': 'MA', 'avgCityPop': 14855.37037037037},
{'_id': 'CT', 'avgCityPop': 14674.625}]
# Return Largest and Smallest Cities by State
# do it step by step
# get populations by cities, and sort it.
pipeline = [
{ "$group": {"_id": { "state": "$state", "city": "$city" }, "city_pop": { "$sum": "$pop" } } },
{ "$sort": { "city_pop": 1 } }
]
cursor = tmp.aggregate(pipeline)
list(cursor)[:10]
[{'_id': {'city': 'SELAWIK', 'state': 'AK'}, 'city_pop': 0},
{'_id': {'city': 'RUSSIAN MISSION', 'state': 'AK'}, 'city_pop': 0},
{'_id': {'city': 'NAKNEK', 'state': 'AK'}, 'city_pop': 0},
{'_id': {'city': 'GRAYLING', 'state': 'AK'}, 'city_pop': 0},
{'_id': {'city': 'CHEVAK', 'state': 'AK'}, 'city_pop': 0},
{'_id': {'city': 'ODELL', 'state': 'OR'}, 'city_pop': 0},
{'_id': {'city': 'NINOLE', 'state': 'HI'}, 'city_pop': 0},
{'_id': {'city': 'OREGON HOUSE', 'state': 'CA'}, 'city_pop': 0},
{'_id': {'city': 'ALLEGHANY', 'state': 'CA'}, 'city_pop': 0},
{'_id': {'city': 'TWIN BRIDGES', 'state': 'CA'}, 'city_pop': 0}]
# regroup the last result by state and return the first and last city name and population
pipeline += [
{ "$group": {"_id" : "$_id.state",
"biggestCity": { "$last": "$_id.city" },
"biggestPop": { "$last": "$city_pop" },
"smallestCity": { "$first": "$_id.city" },
"smallestPop": { "$first": "$city_pop" }
}}
]
cursor = tmp.aggregate(pipeline)
list(cursor)[:3]
[{'_id': 'DE',
'biggestCity': 'NEWARK',
'biggestPop': 111674,
'smallestCity': 'BETHEL',
'smallestPop': 108},
{'_id': 'MO',
'biggestCity': 'SAINT LOUIS',
'biggestPop': 397802,
'smallestCity': 'BENDAVIS',
'smallestPop': 44},
{'_id': 'IL',
'biggestCity': 'CHICAGO',
'biggestPop': 2452177,
'smallestCity': 'ANCONA',
'smallestPop': 38}]
# modify the output format by $project
pipeline += [
{ "$project":
{
"_id": 0,
"state": "$_id",
"biggestCity": { "name": "$biggestCity", "pop": "$biggestPop" },
"smallestCity": { "name": "$smallestCity", "pop": "$smallestPop" }
}
}
]
cursor = tmp.aggregate(pipeline)
list(cursor)[:3]
[{'biggestCity': {'name': 'NEWARK', 'pop': 111674},
'smallestCity': {'name': 'BETHEL', 'pop': 108},
'state': 'DE'},
{'biggestCity': {'name': 'SAINT LOUIS', 'pop': 397802},
'smallestCity': {'name': 'BENDAVIS', 'pop': 44},
'state': 'MO'},
{'biggestCity': {'name': 'CHICAGO', 'pop': 2452177},
'smallestCity': {'name': 'ANCONA', 'pop': 38},
'state': 'IL'}]
pipeline
[{'$group': {'_id': {'city': '$city', 'state': '$state'},
'city_pop': {'$sum': '$pop'}}},
{'$sort': {'city_pop': 1}},
{'$group': {'_id': '$_id.state',
'biggestCity': {'$last': '$_id.city'},
'biggestPop': {'$last': '$city_pop'},
'smallestCity': {'$first': '$_id.city'},
'smallestPop': {'$first': '$city_pop'}}},
{'$project': {'_id': 0,
'biggestCity': {'name': '$biggestCity', 'pop': '$biggestPop'},
'smallestCity': {'name': '$smallestCity', 'pop': '$smallestPop'},
'state': '$_id'}}]
Aggregation Pipeline Quick Reference