Skip to content

Instantly share code, notes, and snippets.

@donsn
Created April 6, 2023 15:19
Show Gist options
  • Save donsn/de2da045dd93ed3b8f3be32f2a38d398 to your computer and use it in GitHub Desktop.
Save donsn/de2da045dd93ed3b8f3be32f2a38d398 to your computer and use it in GitHub Desktop.
Copy data from one MongoDB collection to another collection
// This copies data from the mongodb collection <collection1> to <collection2> preserving the _id field, it will skip if the _id already exists
console.log('Starting mongodb fixes');
var url = "mongodb+srv://<username>:<password>@<clustername>.mongodb.net/<dbname>?retryWrites=true&w=majority";
const { MongoClient } = require('mongodb');
var fs = require('fs');
const databaseName = '<database_name>';
const collectionOne = 'collection1';
const collectionTwo = 'collection2';
let continueProcessing = true;
async function runFix() {
while (continueProcessing) {
await main().catch(console.error);
if (continueProcessing) {
console.log('Waiting 10 seconds before trying again');
// maybe the network is terrible
await sleep(10000);
console.log('Trying again');
} else {
console.log('Completed, exiting');
}
}
}
runFix();
async function main() {
console.log('Connecting to database');
const client = new MongoClient(url, { useNewUrlParser: true });
try {
// Connect to the MongoDB cluster
await client.connect();
// Make the appropriate DB calls
var databases = await listDatabases(client);
var databaseNames = databases.map(db => db.name);
if (!databaseNames.includes(databaseName)) {
console.log('Database does not exist');
continueProcessing = false;
return;
}
console.log('Database exists');
const _database = client.db(databaseName);
var collections = await _database.listCollections().toArray();
var collectionNames = collections.map(collection => collection.name);
if (!collectionNames.includes(collectionOne)) {
console.log('Collection ' + collectionOne + ' does not exist');
continueProcessing = false;
return;
}
console.log('Collection exists');
var collection1 = _database.collection(collectionOne);
var collection2 = _database.collection(collectionTwo);
console.log('Getting data in batches from ' + collectionOne);
const sourceDataCount = await collection1.countDocuments();
console.log('Got ' + sourceDataCount + ' records');
const batchSize = 5000;
// script slows down because the distinct query is slow, so we need to do it in batches
console.log('Getting distinct _id values from ' + collectionTwo );
let idsToSkip = [];
var _ids = await collection2.distinct('_id');
// for some reason, we can't compare MongoDb ObjectIds, so this is a work around
idsToSkip = idsToSkip.concat(_ids).map( val => JSON.stringify(val));
console.log('Got an initial ' + idsToSkip.length + ' ids to skip');
// read the last value of i from a file so we can resume if the script fails
console.log('Reading lastValue.txt');
var fileContents = fs.readFileSync('lastValue.txt', 'utf8');
var lastValue = parseInt(fileContents ?? '0');
if (lastValue) {
console.log('Resuming from last value of ' + lastValue + '\n');
}
for (let i = lastValue; i <= sourceDataCount; i += batchSize) {
// write the last value of i to a file so we can resume if the script fails
fs.writeFileSync('lastValue.txt', i.toString(), 'utf8');
console.log('Getting batch ' + i + ' to ' + (i + batchSize) + ' from ' + collectionTwo);
const startTime = performance.now();
const data = await collection1.find().skip(i).limit(batchSize).toArray();
console.log('Got ' + data.length + ' records from ' + collectionOne + ' in ' + (performance.now() - startTime) / 1000 + ' seconds');
let _time = performance.now();
console.log('Filtering ' + data.length + ' records to remove duplicates');
const dataToInsert = data.filter(item => !idsToSkip.includes(JSON.stringify(item._id)));
console.log('Filtered ' + data.length + ' records to ' + dataToInsert.length + ' records in ' + (performance.now() - _time) / 1000 + ' seconds\n');
if (dataToInsert.length > 0) {
console.log('Inserting data into ' + collectionTwo + ', ' + dataToInsert.length + ' records');
await collection2.insertMany(dataToInsert);
console.log('Inserted ' + dataToInsert.length + ' records into ' + collectionTwo);
} else {
console.log('No data to insert, skipping ' + data.length + ' records');
}
const endTime = performance.now();
console.log('Batch ' + i + ' to ' + (i + batchSize) + ' took total time of ' + (endTime - startTime) / 1000 + ' seconds\n');
}
console.log('Done');
continueProcessing = false;
} catch (e) {
console.error(e);
} finally {
// Close the connection to the MongoDB cluster
await client.close();
}
}
const listDatabases = async (client) => {
databasesList = await client.db().admin().listDatabases();
console.log("Databases:");
databasesList.databases.forEach(db => console.log(` - ${db.name}`));
return databasesList.databases;
};
const sleep = async (ms) => {
return new Promise(resolve => setTimeout(resolve, ms));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment