Created
April 6, 2023 15:19
-
-
Save donsn/de2da045dd93ed3b8f3be32f2a38d398 to your computer and use it in GitHub Desktop.
Copy data from one MongoDB collection to another collection
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// This copies data from the mongodb collection <collection1> to <collection2> preserving the _id field, it will skip if the _id already exists | |
console.log('Starting mongodb fixes'); | |
var url = "mongodb+srv://<username>:<password>@<clustername>.mongodb.net/<dbname>?retryWrites=true&w=majority"; | |
const { MongoClient } = require('mongodb'); | |
var fs = require('fs'); | |
const databaseName = '<database_name>'; | |
const collectionOne = 'collection1'; | |
const collectionTwo = 'collection2'; | |
let continueProcessing = true; | |
async function runFix() { | |
while (continueProcessing) { | |
await main().catch(console.error); | |
if (continueProcessing) { | |
console.log('Waiting 10 seconds before trying again'); | |
// maybe the network is terrible | |
await sleep(10000); | |
console.log('Trying again'); | |
} else { | |
console.log('Completed, exiting'); | |
} | |
} | |
} | |
runFix(); | |
async function main() { | |
console.log('Connecting to database'); | |
const client = new MongoClient(url, { useNewUrlParser: true }); | |
try { | |
// Connect to the MongoDB cluster | |
await client.connect(); | |
// Make the appropriate DB calls | |
var databases = await listDatabases(client); | |
var databaseNames = databases.map(db => db.name); | |
if (!databaseNames.includes(databaseName)) { | |
console.log('Database does not exist'); | |
continueProcessing = false; | |
return; | |
} | |
console.log('Database exists'); | |
const _database = client.db(databaseName); | |
var collections = await _database.listCollections().toArray(); | |
var collectionNames = collections.map(collection => collection.name); | |
if (!collectionNames.includes(collectionOne)) { | |
console.log('Collection ' + collectionOne + ' does not exist'); | |
continueProcessing = false; | |
return; | |
} | |
console.log('Collection exists'); | |
var collection1 = _database.collection(collectionOne); | |
var collection2 = _database.collection(collectionTwo); | |
console.log('Getting data in batches from ' + collectionOne); | |
const sourceDataCount = await collection1.countDocuments(); | |
console.log('Got ' + sourceDataCount + ' records'); | |
const batchSize = 5000; | |
// script slows down because the distinct query is slow, so we need to do it in batches | |
console.log('Getting distinct _id values from ' + collectionTwo ); | |
let idsToSkip = []; | |
var _ids = await collection2.distinct('_id'); | |
// for some reason, we can't compare MongoDb ObjectIds, so this is a work around | |
idsToSkip = idsToSkip.concat(_ids).map( val => JSON.stringify(val)); | |
console.log('Got an initial ' + idsToSkip.length + ' ids to skip'); | |
// read the last value of i from a file so we can resume if the script fails | |
console.log('Reading lastValue.txt'); | |
var fileContents = fs.readFileSync('lastValue.txt', 'utf8'); | |
var lastValue = parseInt(fileContents ?? '0'); | |
if (lastValue) { | |
console.log('Resuming from last value of ' + lastValue + '\n'); | |
} | |
for (let i = lastValue; i <= sourceDataCount; i += batchSize) { | |
// write the last value of i to a file so we can resume if the script fails | |
fs.writeFileSync('lastValue.txt', i.toString(), 'utf8'); | |
console.log('Getting batch ' + i + ' to ' + (i + batchSize) + ' from ' + collectionTwo); | |
const startTime = performance.now(); | |
const data = await collection1.find().skip(i).limit(batchSize).toArray(); | |
console.log('Got ' + data.length + ' records from ' + collectionOne + ' in ' + (performance.now() - startTime) / 1000 + ' seconds'); | |
let _time = performance.now(); | |
console.log('Filtering ' + data.length + ' records to remove duplicates'); | |
const dataToInsert = data.filter(item => !idsToSkip.includes(JSON.stringify(item._id))); | |
console.log('Filtered ' + data.length + ' records to ' + dataToInsert.length + ' records in ' + (performance.now() - _time) / 1000 + ' seconds\n'); | |
if (dataToInsert.length > 0) { | |
console.log('Inserting data into ' + collectionTwo + ', ' + dataToInsert.length + ' records'); | |
await collection2.insertMany(dataToInsert); | |
console.log('Inserted ' + dataToInsert.length + ' records into ' + collectionTwo); | |
} else { | |
console.log('No data to insert, skipping ' + data.length + ' records'); | |
} | |
const endTime = performance.now(); | |
console.log('Batch ' + i + ' to ' + (i + batchSize) + ' took total time of ' + (endTime - startTime) / 1000 + ' seconds\n'); | |
} | |
console.log('Done'); | |
continueProcessing = false; | |
} catch (e) { | |
console.error(e); | |
} finally { | |
// Close the connection to the MongoDB cluster | |
await client.close(); | |
} | |
} | |
const listDatabases = async (client) => { | |
databasesList = await client.db().admin().listDatabases(); | |
console.log("Databases:"); | |
databasesList.databases.forEach(db => console.log(` - ${db.name}`)); | |
return databasesList.databases; | |
}; | |
const sleep = async (ms) => { | |
return new Promise(resolve => setTimeout(resolve, ms)); | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment