Created
May 12, 2018 06:58
-
-
Save slidenerd/f3019b03f5ea9023dfb365d95e60d161 to your computer and use it in GitHub Desktop.
test mongodb alerts with aggregation pipeline
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const | |
fs = require('fs'), | |
fse = require('fs-extra'), | |
MongoClient = require('mongodb').MongoClient, | |
mongoose = require('mongoose'), | |
ObjectID = require('mongodb').ObjectID, | |
request = require('request') | |
const includes = ["AUD", "BRL", "CAD", "CHF", "CLP", "CNY", "CZK", "DKK", "EUR", "GBP", "HKD", "HUF", "IDR", "ILS", "INR", "JPY", "KRW", "MXN", "MYR", "NOK", "NZD", "PHP", "PKR", "PLN", "RUB", "SEK", "SGD", "THB", "TRY", "TWD", "USD", "ZAR"] | |
function normalizeDestinations(fiats = {}) { | |
const keys = Object.keys(fiats.quotes) | |
const normalizedFiats = [] | |
for (let i = 0, length = keys.length; i < length; i++) { | |
const key = keys[i].replace("USD", "") | |
if (includes && includes.length) { | |
if (includes.indexOf(key) >= 0) { | |
normalizedFiats.push({ | |
symbol: key, | |
price: fiats.quotes[keys[i]] | |
}) | |
} | |
} | |
else { | |
normalizedFiats.push({ | |
symbol: key, | |
price: fiats.quotes[keys[i]] | |
}) | |
} | |
} | |
return normalizedFiats | |
} | |
function getAlerts(sources = [], destinations = [], count = 10, sourceLimit = 10, destinationLimit = 10, userCount = 100, alertProbability = 0.1) { | |
const getRandomPrice = (sourcePrice, destinationPrice, multiplier = 0.1) => { | |
const product = sourcePrice * destinationPrice | |
const gx = product + multiplier * product | |
const lx = product - multiplier * product | |
return +((lx + (gx - lx) * Math.random()).toFixed(2)) | |
} | |
const userIds = [] | |
for (let i = 0; i < userCount; i++) { | |
userIds.push(new ObjectID()) | |
} | |
const alerts = [], condensedAlerts = [] | |
let uniqueAlerts = {} | |
for (let i = 0; i < count; i++) { | |
let sourceIndex | |
do { | |
sourceIndex = Math.floor(Math.random() * Math.min(sourceLimit, sources.length)) | |
} | |
while (sources[sourceIndex].price_usd === null || sources[sourceIndex].price_usd === undefined) | |
let destinationIndex = Math.floor(Math.random() * Math.min(destinations.length, destinationLimit)) | |
const id = sources[sourceIndex].id + ":" + destinations[destinationIndex].symbol | |
const randomUserId = userIds[Math.floor(Math.random() * userIds.length)] | |
const randomPrice = getRandomPrice(+sources[sourceIndex].price_usd, destinations[destinationIndex].price) | |
const randomDirection = Math.floor(Math.random() * 2) === 0 ? false : true | |
const item = { | |
_id: new ObjectID(), //unique alert id | |
1: randomUserId, //unique user id | |
2: sources[sourceIndex].id, //from | |
3: destinations[destinationIndex].symbol, //to | |
4: id, | |
5: randomPrice, //price | |
6: randomDirection, //false = less than alert, true = greater than alert | |
7: 0, //type, 0 = price alert, percentage alert | |
} | |
alerts.push(item) | |
const condensedItem = { | |
_id: new ObjectID(), //unique alert id | |
1: randomUserId, //unique user id | |
2: id, | |
3: randomPrice, //price | |
4: randomDirection, //false = less than alert, true = greater than alert | |
5: 0, //type, 0 = price alert, percentage alert | |
} | |
condensedAlerts.push(condensedItem) | |
if (!uniqueAlerts[id]) { | |
uniqueAlerts[id] = { | |
_id: id, | |
1: sources[sourceIndex].id, | |
2: destinations[destinationIndex].symbol, | |
3: +sources[sourceIndex].price_usd * destinations[destinationIndex].price, | |
4: 1 | |
} | |
} | |
else { | |
uniqueAlerts[id]['4'] += 1 | |
} | |
} | |
const unique = [], queries = [], condensedUnique = [], keys = Object.keys(uniqueAlerts) | |
for (let i = 0, length = keys.length; i < length; i++) { | |
const key = keys[i] | |
unique.push({ | |
_id: key, | |
1: uniqueAlerts[key]['1'], | |
2: uniqueAlerts[key]['2'], | |
3: uniqueAlerts[key]['3'], | |
4: uniqueAlerts[key]['4'] | |
}) | |
condensedUnique.push({ | |
_id: key, | |
1: uniqueAlerts[key]['3'], | |
2: uniqueAlerts[key]['4'] | |
}) | |
queries.push({ | |
2: { $eq: key }, | |
$or: [ | |
{ 4: { $eq: false }, 3: { $gte: uniqueAlerts[key]['3'] } }, | |
{ 4: { $eq: true }, 3: { $lte: uniqueAlerts[key]['3'] } }, | |
] | |
}) | |
} | |
const query = { | |
$or: queries | |
} | |
return { alerts, unique, condensedAlerts, condensedUnique, query } | |
} | |
function getSourceOperations(sources = []) { | |
const operations = [] | |
for (let i = 0, length = sources.length; i < length; i++) { | |
const source = sources[i] | |
operations.push({ | |
updateOne: | |
{ | |
"filter": { _id: source.id }, | |
"update": { | |
_id: source.id, | |
1: +source.price_usd, | |
2: source.symbol | |
}, | |
"upsert": true | |
} | |
}) | |
} | |
return operations | |
} | |
function getDestinationOperations(destinations = []) { | |
const operations = [] | |
for (let i = 0, length = destinations.length; i < length; i++) { | |
const key = destinations[i].symbol | |
const item = { | |
updateOne: | |
{ | |
"filter": { _id: key }, | |
"update": { | |
1: destinations[i].price | |
}, | |
"upsert": true | |
} | |
} | |
if (includes && includes.length) { | |
if (includes.indexOf(key) >= 0) { | |
operations.push(item) | |
} | |
} | |
else { | |
operations.push(item) | |
} | |
} | |
return operations | |
} | |
function upsertNative(collectionName, operations) { | |
// Use connect method to connect to the Server | |
MongoClient.connect('mongodb://localhost:27017', function (err, client) { | |
const db = client.db('testalerts') | |
// Insert a single document | |
const t1 = new Date().getTime() | |
db.collection(collectionName).bulkWrite(operations, (error, result) => { | |
const t2 = new Date().getTime() | |
if (error) { | |
console.log(error, "upsertNative", collectionName, (t2 - t1) / 1000, "seconds") | |
} | |
else { | |
console.log( | |
"upsertNative", | |
collectionName, | |
(t2 - t1) / 1000, | |
"seconds", | |
"upserted", result.upsertedCount, | |
"inserted", result.insertedCount, | |
"deleted", result.deletedCount, | |
"matched", result.matchedCount, | |
"modified", result.modifiedCount | |
) | |
client.close() | |
} | |
}) | |
}) | |
} | |
function deleteInsert(collectionName, items) { | |
MongoClient.connect('mongodb://localhost:27017', function (err, client) { | |
const db = client.db('testalerts') | |
const t1 = new Date().getTime() | |
db.listCollections() | |
.toArray() | |
.then(collections => { | |
const t2 = new Date().getTime() | |
let found = false | |
for (let i = 0, length = collections.length; i < length; i++) { | |
if (collections[i].name === collectionName) { | |
found = true | |
break; | |
} | |
} | |
if (found) { | |
return db.collection(collectionName).drop() | |
} | |
}) | |
.then(result => { | |
if (result) { | |
console.log("Deleted", collectionName, result) | |
} | |
return db.collection(collectionName).insertMany(items) | |
}) | |
.then(result => { | |
console.log("Inserted", collectionName, result.insertedCount, result.result.n, result.result.ok) | |
}) | |
.catch(console.log) | |
.then(() => { | |
client.close() | |
}) | |
}) | |
} | |
function aggregate(collectionName, fileName, pipeline) { | |
// Use connect method to connect to the Server | |
MongoClient.connect('mongodb://localhost:27017', function (err, client) { | |
const db = client.db('testalerts') | |
const t1 = new Date().getTime() | |
db.collection(collectionName).aggregate(pipeline, { allowUseDisk: true }).toArray((error, result) => { | |
const t2 = new Date().getTime() | |
if (error) { | |
console.log(error, "Aggregation", collectionName, (t2 - t1) / 1000, "seconds") | |
} | |
else { | |
console.log("Aggregation", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents") | |
} | |
fse.writeJson(fileName, result.slice(0, 20), { spaces: 4 }) | |
client.close() | |
}) | |
}) | |
} | |
function refreshData(url, fileName, refresh = false) { | |
if (refresh) { | |
request(url, { encoding: 'utf-8' }, (error, response, body) => { | |
if (error) { | |
console.log(error) | |
} | |
else if (response.statusCode !== 200) { | |
console.log(response.statusCode, response.statusMessage) | |
} | |
else { | |
const coins = JSON.parse(body) | |
console.log("Saving", coins.length, "coins", fileName) | |
fse.writeJson(fileName, coins, {}) | |
} | |
}) | |
} | |
} | |
function queryAlerts(collectionName, query) { | |
MongoClient.connect('mongodb://localhost:27017', function (err, client) { | |
const db = client.db('testalerts') | |
const t1 = new Date().getTime() | |
db.collection(collectionName).find(query).toArray((error, result) => { | |
const t2 = new Date().getTime() | |
if (error) { | |
console.log(error, "query alerts", collectionName, (t2 - t1) / 1000, "seconds") | |
} | |
else { | |
console.log("query alerts", collectionName, (t2 - t1) / 1000, "seconds returned", result.length, "documents") | |
} | |
client.close() | |
}) | |
}) | |
} | |
module.exports = { | |
aggregate, | |
deleteInsert, | |
getAlerts, | |
getDestinationOperations, | |
getSourceOperations, | |
normalizeDestinations, | |
queryAlerts, | |
upsertNative, | |
} | |
refreshData("https://api.coinmarketcap.com/v1/ticker/?limit=0", "cmc.json", false) | |
refreshData("http://apilayer.net/api/live?access_key=f8b2013585398ee39a1ef56fc6caf458¤cies=&source=USD&format=2", "fiats.json", false) | |
Index.js | |
const | |
fs = require('fs'), | |
fse = require('fs-extra'), | |
utils = require('./utils'); | |
function aggregateAlerts() { | |
const collectionName = "e1_alerts" | |
const pipeline = [ | |
{ | |
"$lookup": { | |
"from": "e1_sources", | |
"localField": "2", | |
"foreignField": "_id", | |
"as": "s" | |
} | |
}, | |
{ | |
"$unwind": "$s" | |
}, | |
{ | |
"$project": { | |
"_id": 0, | |
"1": 1, | |
"2": 1, | |
"3": 1, | |
"4": 1, | |
"5": 1, | |
"s": "$s.1" | |
} | |
}, | |
{ | |
"$lookup": { | |
"from": "e1_destinations", | |
"localField": "3", | |
"foreignField": "_id", | |
"as": "d" | |
} | |
}, | |
{ | |
"$unwind": "$d" | |
}, | |
{ | |
"$project": | |
{ | |
"1": 1, | |
"2": 1, | |
"3": 1, | |
"4": 1, | |
"5": 1, | |
"m": { | |
"$multiply": ["$s", "$d.1"] | |
}, | |
"c": | |
{ | |
"$gte": | |
[ | |
"$4", | |
{ | |
"$multiply": ["$s", "$d.1"] | |
} | |
] | |
} | |
} | |
}, | |
{ | |
"$match": | |
{ | |
"$or": [ | |
{ | |
"5": false, | |
"c": true | |
}, | |
{ | |
"5": true, | |
"c": false | |
} | |
] | |
} | |
} | |
] | |
return { collectionName, pipeline } | |
} | |
function aggregateUniqueAlertsNoPipelineMultipleLookups() { | |
const collectionName = "e1_unique_alerts" | |
const pipeline = [ | |
{ | |
"$lookup": { | |
"from": "e1_sources", | |
"localField": "1", | |
"foreignField": "_id", | |
"as": "s" | |
} | |
}, | |
{ "$unwind": "$s" }, | |
{ | |
"$project": { | |
"_id": 0, | |
"1": 1, | |
"2": 1, | |
"3": 1, | |
"4": "$s.1" | |
} | |
}, | |
{ | |
"$lookup": { | |
"from": "e1_destinations", | |
"localField": "2", | |
"foreignField": "_id", | |
"as": "d" | |
} | |
}, | |
{ "$unwind": "$d" }, | |
{ "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } }, | |
{ | |
"$lookup": { | |
"from": "e1_alerts", | |
"localField": "3", | |
"foreignField": "4", | |
"as": "a" | |
} | |
}, | |
{ "$unwind": "$a" }, | |
{ | |
"$project": { | |
"_id": "$a._id", | |
"1": "$a.1", | |
"2": "$3", | |
"3": "$a.5", | |
"4": "$a.6", | |
"5": "$a.7", | |
"6": "$4" | |
} | |
}, | |
{ | |
"$match": { | |
"$expr": { | |
"$or": [ | |
{ | |
"$and": [ | |
{ "$eq": ["$4", false] }, | |
{ "$gte": ["$3", "$6"] } | |
] | |
}, | |
{ | |
"$and": [ | |
{ "$eq": ["$4", true] }, | |
{ "$lte": ["$3", "$6"] } | |
] | |
} | |
] | |
} | |
} | |
} | |
] | |
return { collectionName, pipeline } | |
} | |
//all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 17.798 seconds | |
//created indexes in the e1_alerts table for source, destination and source:destination combination | |
function aggregateUniqueAlertsPipelineLastStage() { | |
const collectionName = "e1_unique_alerts" | |
const pipeline = [ | |
{ | |
"$lookup": { | |
"from": "e1_sources", | |
"localField": "1", | |
"foreignField": "_id", | |
"as": "s" | |
} | |
}, | |
{ "$unwind": "$s" }, | |
{ | |
"$project": { | |
"_id": 0, | |
"1": 1, | |
"2": 1, | |
"3": 1, | |
"4": "$s.1" | |
} | |
}, | |
{ | |
"$lookup": { | |
"from": "e1_destinations", | |
"localField": "2", | |
"foreignField": "_id", | |
"as": "d" | |
} | |
}, | |
{ "$unwind": "$d" }, | |
{ "$project": { "1": 1, "2": 1, "3": 1, "4": { "$multiply": ["$4", "$d.1"] } } }, | |
{ | |
"$lookup": { | |
"from": "e1_alerts", | |
"let": { | |
"pair": "$3", | |
"price": "$4" | |
}, | |
"pipeline": [ | |
{ | |
"$match": { | |
"$expr": { | |
"$and": [ | |
{ | |
"$eq": ["$$pair", "$4"] | |
}, | |
{ | |
"$or": [ | |
{ | |
"$and": [ | |
{ | |
"$eq": ["$6", false] | |
}, | |
{ | |
"$gte": ["$5", "$$price"] | |
} | |
] | |
}, | |
{ | |
"$and": [ | |
{ | |
"$eq": ["$6", true] | |
}, | |
{ | |
"$lte": ["$5", "$$price"] | |
} | |
] | |
} | |
] | |
} | |
] | |
} | |
} | |
} | |
], | |
"as": "a" | |
} | |
}, | |
{ "$unwind": "$a" }, | |
{ | |
"$project": { | |
"_id": "$a._id", | |
"1": "$a.1", | |
"2": "$3", | |
"3": "$a.5", | |
"4": "$a.6", | |
"5": "$a.7", | |
"6": "$4", | |
} | |
} | |
] | |
return { collectionName, pipeline } | |
} | |
//all cryptos = 1525, all fiats = 32, unique alerts count = 42445 alerts count = 100000, time taken = 24.5 seconds | |
//created indexes in the e1_alerts table for source, destination and source:destination combination, pipelines = bad | |
function aggregateUniqueAlertsPipelineAll() { | |
const collectionName = "e1_unique_alerts" | |
const pipeline = [ | |
{ | |
"$lookup": { | |
"from": "e1_sources", | |
"let": { source: "$1" }, | |
"pipeline": [ | |
{ | |
"$match": { | |
"$expr": { | |
"$eq": ["$_id", "$$source"] | |
} | |
} | |
}, | |
{ | |
"$project": { | |
"1": 1, "_id": 0 | |
} | |
} | |
], | |
"as": "s" | |
} | |
}, | |
{ "$unwind": "$s" }, | |
{ | |
"$project": { | |
"_id": 0, | |
"1": 1, | |
"2": 1, | |
"3": "$s.1" | |
} | |
}, | |
{ | |
"$lookup": { | |
"from": "e1_destinations", | |
"let": { destination: "$2" }, | |
"pipeline": [ | |
{ | |
"$match": { | |
"$expr": { | |
"$eq": ["$_id", "$$destination"] | |
} | |
} | |
}, | |
{ | |
"$project": { | |
"_id": 0 | |
} | |
} | |
], | |
"as": "d" | |
} | |
}, | |
{ "$unwind": "$d" }, | |
{ | |
"$project": | |
{ | |
"1": 1, | |
"2": 1, | |
"3": { "$multiply": ["$3", "$d.1"] } | |
} | |
}, | |
{ | |
"$lookup": { | |
"from": "e1_alerts", | |
"let": { source: "$1", destination: "$2", price: "$3" }, | |
"pipeline": [ | |
{ | |
"$match": { | |
"$expr": { | |
"$and": [ | |
{ | |
"$eq": ["$$source", "$2"] | |
}, | |
{ | |
"$eq": ["$$destination", "$3"] | |
}, | |
{ | |
"$or": [ | |
{ | |
"$and": [ | |
{ | |
"$eq": ["$5", false] | |
}, | |
{ | |
"$gte": ["$4", "$$price"] | |
} | |
] | |
}, | |
{ | |
"$and": [ | |
{ | |
"$eq": ["$5", true] | |
}, | |
{ | |
"$lte": ["$4", "$$price"] | |
} | |
] | |
} | |
] | |
} | |
] | |
} | |
} | |
} | |
], | |
"as": "a" | |
} | |
}, | |
{ "$unwind": "$a" }, | |
{ | |
"$project": { | |
"_id": "$a._id", | |
"1": "$a.1", | |
"2": "$1", | |
"3": "$2", | |
"4": "$a.4", | |
"5": "$a.5", | |
"6": "$a.6", | |
"7": "$3" | |
} | |
} | |
] | |
return { collectionName, pipeline } | |
} | |
function aggregateUniqueAlertsNoLookups() { | |
const collectionName = "e1_unique_alerts" | |
const pipeline = [ | |
{ | |
"$lookup": { | |
"from": "e1_alerts", | |
"localField": "_id", | |
"foreignField": "4", | |
"as": "a" | |
} | |
}, | |
{ | |
"$unwind": "$a" | |
}, | |
{ | |
"$project": { | |
"_id": "$a._id", | |
"1": "$a.1", | |
"2": "$a.4", | |
"3": "$a.5", | |
"4": "$a.6", | |
"5": "$a.7", | |
"6": "$3" | |
} | |
}, | |
{ | |
"$match": { | |
"$expr": { | |
"$or": [ | |
{ | |
"$and": [ | |
{ "$eq": ["$4", false] }, | |
{ "$gte": ["$3", "$6"] } | |
] | |
}, | |
{ | |
"$and": [ | |
{ "$eq": ["$4", true] }, | |
{ "$lte": ["$3", "$6"] } | |
] | |
} | |
] | |
} | |
} | |
} | |
] | |
return { collectionName, pipeline } | |
} | |
function aggregateUniqueAlertsCondensedNoLookups() { | |
const collectionName = "e1_unique_alerts_condensed" | |
const pipeline = [ | |
{ | |
"$lookup": { | |
"from": "e1_alerts_condensed", | |
"localField": "_id", | |
"foreignField": "2", | |
"as": "a" | |
} | |
}, | |
{ | |
"$unwind": "$a" | |
}, | |
{ | |
"$project": { | |
"_id": "$a._id", | |
"1": "$a.1", | |
"2": "$a.2", | |
"3": "$a.3", | |
"4": "$a.4", | |
"5": "$a.5", | |
"6": "$1" | |
} | |
}, | |
{ | |
"$match": { | |
"$expr": { | |
"$or": [ | |
{ | |
"$and": [ | |
{ "$eq": ["$4", false] }, | |
{ "$gte": ["$3", "$6"] } | |
] | |
}, | |
{ | |
"$and": [ | |
{ "$eq": ["$4", true] }, | |
{ "$lte": ["$3", "$6"] } | |
] | |
} | |
] | |
} | |
} | |
} | |
] | |
return { collectionName, pipeline } | |
} | |
async function refreshDatabase(refresh = false) { | |
if (refresh) { | |
let fiats = await fse.readJson("fiats.json") | |
fiats = utils.normalizeDestinations(fiats) | |
const cryptos = await fse.readJson("cmc.json") | |
const sourceOperations = utils.getSourceOperations(cryptos) | |
utils.upsertNative("e1_sources", sourceOperations) | |
const destinationOperations = utils.getDestinationOperations(fiats) | |
utils.upsertNative("e1_destinations", destinationOperations) | |
const { alerts, unique, condensedAlerts, condensedUnique, query } = utils.getAlerts(cryptos, fiats, 10000, 1, 1) | |
// fse.writeJson("alerts.json", alerts) | |
// fse.writeJson("unique.json", unique) | |
// fse.writeJson("condensed_alerts.json", condensedAlerts) | |
// fse.writeJson("condensed_unique.json", condensedUnique) | |
fse.writeJson("query.json", query, { spaces: 4 }) | |
utils.deleteInsert("e1_alerts", alerts) | |
utils.deleteInsert("e1_unique_alerts", unique) | |
utils.deleteInsert("e1_alerts_condensed", condensedAlerts) | |
utils.deleteInsert("e1_unique_alerts_condensed", condensedUnique) | |
setTimeout(() => { | |
utils.queryAlerts("e1_alerts_condensed", query) | |
}, 30000) | |
} | |
} | |
(function () { | |
refreshDatabase(true) | |
const run1 = false | |
const run2 = false | |
if (run1) { | |
const { collectionName, pipeline } = aggregateUniqueAlertsNoLookups() | |
utils.aggregate(collectionName, "aggregation.json", pipeline) | |
} | |
if (run2) { | |
const { collectionName, pipeline } = aggregateUniqueAlertsCondensedNoLookups() | |
utils.aggregate(collectionName, "aggregation_condensed.json", pipeline) | |
} | |
})() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment