Skip to content

Instantly share code, notes, and snippets.

@kublermdk
Created May 10, 2020 09:09
Show Gist options
  • Save kublermdk/1a8c79823e1ea7421e6db60b00c1171e to your computer and use it in GitHub Desktop.
Save kublermdk/1a8c79823e1ea7421e6db60b00c1171e to your computer and use it in GitHub Desktop.
An example of the MongoDB Aggregation pipelines and commands used for some general filters. View
// -----------------------------------------------------------------
// Applying aggregation pipeline #1 Customer - HAVE
// -----------------------------------------------------------------
db.command({
"aggregate": "app.customer",
"pipeline": [{
"$match": {
"$and": [{
// -- Ensure the customers we are selecting are active and have a Firebase token so we can send them a push notification
"status": "active",
"firebaseToken": {"$exists": true, "$ne": null} // We likely wouldn't add the firebaseToken check if sending coupons
}, {
// -- Checking for the Age ranges 0-20, 31-40, 41-50, 51-60
"$or": [{
"$and": [{
"dateOfBirth": {
"$lte": 1589068800,
"$gte": 957916800
}
}]
}, {
"$and": [{
"dateOfBirth": {
"$lte": 610761600,
"$gte": 326764800
}
}]
}, {
"$and": [{
"dateOfBirth": {
"$lte": 295142400,
"$gte": 11145600
}
}]
}, {
"$and": [{
"dateOfBirth": {
"$lte": -20390400,
"$gte": -304387200
}
}]
}
]
}, {
// Check for the state
"state": {"$in": ['SA', 'Vic', 'QLD', 'NSW', 'WA', 'Tas']}
}]
}
}, {
// Only get the Customer's _id field
"$project": {"_id": 1}
}, {
"$merge": {
"into": "customerGroup-5eb78328af2f5c079a1e9352",
"on": "_id",
"whenMatched": "keepExisting",
"whenNotMatched": "insert" // This is the first aggregation run, so we just insert all entries, don't need to merge
}
}],
})
// -----------------------------------------------------------------
// Applying aggregation pipeline #2 Transaction - HAVE
// -----------------------------------------------------------------
db.command({
"aggregate": "app.customerTransaction",
"pipeline": [
{
"$match":
{
// We thankfully can $and all the HAVE $match'es together
"$and": [
{
// -- Search for customers who in the last 60 days have shopped at the listed stores
"$and": [
{
"transactionDate": {"$gte": 1583884800}
}, {
"storeId": {"$in": ["MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f2)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f3)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f4)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f5)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f6)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88f7)", "MongoDB\\BSON\\ObjectId(5e6111bcaf2f5c1d9f7f88fb)"]}
}
]
}, {
// -- For the Exclude of "Haven't Purchased With Product Categories X Within Y Days" filter
// we convert it to an include (select) of those who HAVE purchased the various Product Categories in the last 100 days
// as we are then removing those who haven't made such a purchase
"$and": [
{
"transactionDate": {"$gte": 1580428800}
},
{
"productCategories": {
"$in": ["MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415062)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415084)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415087)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f9541508b)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f9541508f)", "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f9541508d)"]
}
}]
}]
}
},
{
// Select just the customerId field
"$group": {"_id": "$customerId"}
}, {
// Add a selected: true to the customers we match on
"$addFields": {"selected": true}
}, {
"$merge": {
"into": "customerGroup-5eb78328af2f5c079a1e9352",
"on": "_id",
"whenMatched": "merge", // Merge the results, setting selected:true to these
"whenNotMatched": "discard" // We aren't adding any new customers, we are filtering down the existing set
}
}],
});
// Delete all entries we didn't just mark as selected
db['customerGroup-5eb78328af2f5c079a1e9352'].bulkWrite([{
"type": "delete",
"condition": {"selected": {"$exists": false}},
"options": {"multi": true}
}])
// Remove the selected field, ready for the next pipelines
db['customerGroup-5eb78328af2f5c079a1e9352'].bulkWrite([{
"type": "update",
"condition": {"selected": {"$exists": true}},
"document": {"$unset": {"selected": ""}},
"options": {"multi": true, "upsert": false}
}])
// End of pipeline 2
// -----------------------------------------------------------------
// Applying aggregation pipeline #3 Customer - Have Not
// -----------------------------------------------------------------
db.command({
"aggregate": "app.customer",
"pipeline": [{
"$match": {
"$or": [{
// Excluding the males by
// selecting the males and then removing them
"gender": {"$in": ["male"]}
}]
}
}, {
// Just want the customer._id field
"$project": {"_id": 1}
}, {
// Add an excluded:true field to the ones we match
"$addFields": {"excluded": true}
}, {
"$merge": {
"into": "customerGroup-5eb78328af2f5c079a1e9352",
"on": "_id",
"whenMatched": "merge", // Here's where we say we are merging the excluded field
"whenNotMatched": "discard"
}
}],
});
// Delete the excluded customers
db['customerGroup-5eb78328af2f5c079a1e9352'].bulkWrite([{
"type": "delete",
"condition": {"excluded": true},
"options": {"multi": true}
}]);
// End of pipeline 3
// -----------------------------------------------------------------
// Applying aggregation pipeline #4 Transaction - Have Not
// -----------------------------------------------------------------
db.command({
"aggregate": "app.customerTransaction",
"pipeline": [{
"$match": {
// $or the HAVE NOT's together
"$or": [{
"$and": [{
// The Include filter of "Haven't Purchased With Product Categories X Within Y Days, productCategories: SEAFOOD - SPECIAL, days: 2"
// gets converted into an exclude of those who have purchased.
// So we select and exclude anyone who's purchased from the Special Seafood category of food in the last 2 days
"transactionDate": {"$gte": 1588896000}
}, {
"productCategories": {"$in": ["MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415067)"]}
}]
}, {
// The Exclude filter of "Have Purchased Products X Within Y Days, products: 180 PROT BAR VEGAN CHOC MINT 40G, days: 90"
// Gets converted into us selecting and excluding those who've purchased the Vegan protein bar in the last 90 days.
"$and": [{
"transactionDate": {"$gte": 1581292800}
}, {
"products": {"$in": ["MongoDB\\BSON\\ObjectId(5e916292af2f5c0896207e37)"]}
}]
}]
}
}, {
"$group": {
"_id": "$customerId"
}
}, {
"$addFields": {"excluded": true}
}, {
"$merge": {
"into": "customerGroup-5eb78328af2f5c079a1e9352",
"on": "_id",
"whenMatched": "merge",
"whenNotMatched": "discard"
}
}],
});
// Remove the excluded customers
db['customerGroup-5eb78328af2f5c079a1e9352'].bulkWrite([{"type":"delete","condition":{"excluded":true},"options":{"multi":true}}]);
// End of pipeline 4
// -----------------------------
// We are using "customerGroup-<customerGroupId>" for the merged collection results name. So each customerGroup has it's own merged collection with results.
// The collection customerGroup-5eb78328af2f5c079a1e9352 should now have a set of _id's which are the customerId's which match all the filters.
// We can now load up the actual customer models in batches based on those _id's and process them, sending push notifications, coupons, surveys, etc..
// We save a customerGroup.lastRan unixtime field so we know if the merged results are stale and need re-creating.
// Note: Whilst this is nearly valid MongoDB commands, it's been modified and is based on the PHP output of run commands
// You'll want to change "MongoDB\\BSON\\ObjectId(5e79a458af2f5c1f95415062)" to something like ObjectId("5e79a458af2f5c1f95415062")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment