Last active
January 29, 2018 23:50
-
-
Save benjamine/800562afc3a1e53375f8c4808cee4158 to your computer and use it in GitHub Desktop.
node script to fix mongo docs with controlled concurrency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env node | |
/* | |
Requirements: | |
- node v8.9.3+ | |
- npm install mongodb | |
- npm install p-queue | |
*/ | |
const PQueue = require('p-queue'); | |
const mongodb = require('mongodb'); | |
const MongoClient = mongodb.MongoClient; | |
const ObjectId = mongodb.ObjectId; | |
runBatchFix({ | |
/* batch fix parameters */ | |
url: 'mongodb://INSERTYOURMONGOHOSTNAME:27017/', | |
dbName: 'INSERTYOURMONGODBNAME', | |
collection: 'INSERTYOURMONGOCOLLECTIONNAME', | |
documentProjection: { _id: 1 }, // only get id of each doc | |
filter: { | |
// filter documents to process | |
// _id: new ObjectId('11111c1c1b11b1c11a11cc1'), | |
}, | |
concurrency: 10, | |
fixDocument: async(doc, db) => { | |
// example fix function | |
// overwrite customer doc .users array with its users from users collection | |
const users = await db.collection('user').find({ | |
customerId: doc._id.toString(), | |
}).toArray(); | |
const customerUsers = users.map(u => ({ | |
$ref: 'user', $id: new ObjectId(u._id), | |
})); | |
await db.collection('customer').updateOne({ | |
_id: new ObjectId(doc._id), | |
}, { | |
$set: { | |
users: customerUsers, | |
}, | |
}); | |
}, | |
}); | |
function runBatchFix({ | |
url, dbName, collection, | |
documentProjection, | |
filter, | |
concurrency, | |
fixDocument, | |
}) { | |
MongoClient.connect(url, async function(err, client) { | |
if (err) { | |
console.error(err); | |
process.exit(1); | |
} | |
try { | |
console.log('connected to', url); | |
await fixThemAll(client); | |
} catch (err) { | |
console.error(err); | |
process.exit(1); | |
} | |
}); | |
async function fixThemAll(client) { | |
const db = client.db(dbName); | |
console.log('counting documents at', dbName, ', collection', collection, filter); | |
const count = await db.collection(collection).count(filter); | |
if (count < 1) { | |
console.log('zero documents found'); | |
process.exit(); | |
} | |
console.log('fixing', count, 'document(s)'); | |
// eslint-disable-next-line fp/no-let | |
let fixes = 0; | |
const queue = new PQueue({concurrency}); | |
const documents = await db.collection(collection).find( | |
filter, | |
documentProjection | |
).toArray(); | |
const start = new Date().getTime(); | |
documents.forEach(function(doc) { | |
queue.add(() => fixOne(doc)); | |
}); | |
async function fixOne(doc) { | |
await fixDocument(doc, db); | |
// eslint-disable-next-line fp/no-mutation | |
fixes++; | |
const eta = ((new Date().getTime() - start) / fixes) * | |
(count - fixes) / 1000; | |
console.log('doc', doc._id, 'fixed,', fixes, 'of', count, | |
'ETA', eta, 'seconds'); | |
if (fixes === count) { | |
console.log('done! took ', (new Date().getTime() - start) / 1000, 'seconds'); | |
process.exit(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment