Created
May 10, 2012 01:33
-
-
Save TylerBrock/2650315 to your computer and use it in GitHub Desktop.
MongoDB Sharded Cluster Orphaned (Duplicate) Document Finder/Remover
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* | |
* orphanage.js -- Utility to find and remove orphaned documents | |
* 10gen 2012 -- Tyler Brock | |
* | |
* Script Orphan Finder Procedure: | |
* - Set up a connection to each shard | |
* - Turn of the balancer | |
* - For each chunk of data | |
* - Query each shard that is not in config | |
* - If the shard contains that chuck it is an orphan | |
* - Return a list of the orphan document counts for each {shard, chunk} | |
* | |
* Quick Sharding Setup: | |
* - var st = new ShardingTest({ shards: 2, mongos: 1 }) | |
* - var st = new ShardingTest({ shards: 2, mongos: 1, other: { rs: true }}) | |
* - var mongos = st.s | |
* - var shard0 = st.shard0 | |
* - var shard1 = st.shard1 | |
* | |
* Usage: | |
* - Balancer.stop() -- Stop the balancer | |
* - Orphans.find('db.collection') -- Find orphans in a given namespace | |
* - Orphans.findAll() -- Find orphans in all namespaces | |
* - Orphans.remove() -- Removes the next chunk | |
* - Orphans.remove(true) -- Soft remove (sets _removed field to true) | |
* - Balancer.start() -- Start the balancer | |
* | |
*/ | |
var mongos = db.getMongo() | |
var config = mongos.getDB('config') | |
// Self Test | |
var Test = { | |
run: function(){ | |
// Setup new sharding test | |
var st = new ShardingTest({ shards: 2, mongos: 1/*, other: { rs: true }*/}) | |
mongos = st.s | |
config = mongos.getDB('config') | |
// Stop the balancer | |
Balancer.stop() | |
assert.eq(true, Balancer.stopped()) | |
var db = "shard-test" | |
var ns = db + "." + "test" | |
// Drop existing data | |
mongos.getDB(db).dropDatabase() | |
mongos.getCollection(ns).ensureIndex({ a: 1, b: 1 }) | |
// Enable sharding and shard the collection | |
st.adminCommand({ enablesharding: db }) | |
st.adminCommand({ shardcollection: ns, key: { a: 1, b: 1 } }) | |
// Get connection to non-authoratative shard for chunk | |
var owner = config.chunks.findOne({ ns: ns }).shard | |
var other = config.shards.findOne({ _id: {$ne: owner }}) | |
var splitPoint = { a: 0, b: new ObjectId() } | |
st.adminCommand({ split: ns, middle: splitPoint }) | |
st.adminCommand({ moveChunk: ns, find: splitPoint, to: other._id }) | |
st.printShardingStatus() | |
// Insert good documents | |
var collection = mongos.getCollection(ns) | |
collection.insert({ a: -1, b: new ObjectId() }) | |
collection.insert({ a: -2, b: new ObjectId() }) | |
assert.eq(collection.getDB().getLastError(), null) | |
// Make sure no bad documents in that namespace | |
var good = collection.find().count() | |
assert.eq(2, good) | |
var bad = Orphans.find(ns).count | |
assert.eq(0, bad) | |
// Insert orphan | |
var connection = new Mongo(other.host) | |
collection = connection.getCollection(ns) | |
collection.insert({ a: -3, b: new ObjectId() }) | |
assert.eq(collection.getDB().getLastError(), null) | |
// Make sure one bad document in namespace | |
bad = Orphans.find(ns).count | |
assert.eq(1, bad) | |
// Insert another bad doc -- count should be 2 | |
collection.insert({ a: -4, b: new ObjectId() }) | |
assert.eq(collection.getDB().getLastError(), null) | |
bad = Orphans.find(ns).count | |
assert.eq(2, bad) | |
// Remove orphans | |
var result = Orphans.find(ns) | |
numRemoved = 0 | |
while (result.hasNext()) { | |
//result.next() | |
numRemoved += result.remove() | |
} | |
assert.eq(2, numRemoved) | |
// Make sure no orphans still exist | |
bad = Orphans.find(ns).count | |
assert.eq(0, bad) | |
// Make sure we didn't remove any good docs | |
collection = mongos.getCollection(ns) | |
var good = collection.find().itcount() | |
assert.eq(2, good) | |
Balancer.start() | |
assert.eq(false, Balancer.stopped()) | |
st.stop() | |
} | |
} | |
// Balancer object -- controls and retrieves balancer state | |
var Balancer = { | |
stop: function(){ | |
config.settings.update( | |
{ _id: "balancer" }, | |
{ $set : { stopped: true } }, | |
true ) | |
// Make sure the balancer has stopped | |
while (config.locks.findOne({_id: "balancer"}).state) { | |
print("waiting for balancer to stop...") | |
sleep(1000) | |
} | |
sleep(3000) | |
// Check again | |
while (config.locks.findOne({_id: "balancer"}).state) { | |
print("waiting for balancer to stop...") | |
sleep(1000) | |
} | |
// Consider sleeping for 30 seconds or doing additional checks | |
print("balancer stopped") | |
}, | |
start: function(){ | |
config.settings.update( | |
{ _id: "balancer" }, | |
{ $set : { stopped: false } }, | |
true ) | |
print("balancer started") | |
}, | |
stopped: function(){ | |
var settings = config.settings.findOne({_id: "balancer"}) | |
var running = config.locks.findOne({_id: "balancer"}).state | |
if (settings && settings.stopped && !running) return true | |
return false | |
} | |
} | |
// Shard object -- contains shard related functions | |
var Shard = { | |
// Returns an array of sharded namespaces | |
namespaces: function(){ | |
sharded = config.collections.find() | |
var namespaces = [] | |
while (sharded.hasNext()) { | |
namespaces.push(sharded.next()._id) | |
} | |
return namespaces | |
}, | |
// Returns map of shard names -> shard connections | |
connections: function() { | |
var shardConnections = {} | |
var shards = config.shards.find() | |
while (shards.hasNext()) { | |
shard = shards.next() | |
shardConnections[shard._id] = new Mongo(shard.host) | |
} | |
return shardConnections | |
}, | |
// this could be overkill | |
hasShardKeyIndex: function(naCollection) { | |
// ensure there is an index for shard key on this shard | |
// the shard could have no data and therefore no index | |
var key = config.collections.findOne({ _id: naCollection.getFullName() }).key | |
var indexes = naCollection.getIndexKeys() | |
var ensured = false | |
// iterate over index keys for collection on this shard | |
for (var i = 0; i < indexes.length; i++) { | |
satisfied = true | |
// iterate over all components of the shard key | |
for (var k in key) { | |
if (!(k in indexes[i])) { | |
satisfied = false | |
break | |
} | |
} | |
if (satisfied) { | |
ensured = true | |
break | |
} | |
} | |
return ensured | |
} | |
} | |
// Orphans object -- finds and removes orphaned documents | |
var Orphans = { | |
find: function(namespace) { | |
// Make sure this script is being run on mongos | |
assert(config.runCommand({ isdbgrid: 1}).ok, "Not a sharded cluster") | |
assert(Balancer.stopped(), "Balancer must be stopped first") | |
print("Searching for orphans in namespace [" + namespace + "]") | |
var connections = Shard.connections() | |
var result = { | |
badChunks: [], | |
count: 0, | |
hasNext: function(){ | |
if (this.badChunks.length > 0) { return true } | |
else { return false } | |
}, | |
next: function(){ | |
bchunk = this.badChunks[0] | |
print("Calling Orphans.remove() will remove " + bchunk.orphanCount + | |
" orphans from chunk " + bchunk._id + " on " + bchunk.orphanedOn) | |
print("Documents for this chunk should only exist on " + bchunk.shard) | |
}, | |
remove: function(soft){ | |
var bchunk = this.badChunks.splice(0,1)[0] | |
print("Removing " + bchunk._id + " from " + bchunk.orphanedOn) | |
var naCollection = connections[bchunk.orphanedOn].getCollection(namespace) | |
var toRemove = naCollection.find({}, {_id: 1}).min(bchunk.min).max(bchunk.max) | |
var idsToRemove = [] | |
while (toRemove.hasNext()) { | |
idsToRemove.push(toRemove.next()._id) | |
} | |
// If this is a soft removal just add _removed boolean | |
if (soft) { | |
var field = { } | |
if (typeof(soft) == "string") { | |
field[soft] = true | |
} else { | |
field["_removed"] = true | |
} | |
naCollection.update( | |
{ _id: { $in: idsToRemove } }, | |
{ $set: field }, | |
false, true) | |
} else { | |
naCollection.remove({ _id: { $in: idsToRemove } }) | |
} | |
if (error = naCollection.getDB().getLastError()) { | |
print("-> There was an error: " + error) | |
} else { | |
print("-> Sucessfully removed " + idsToRemove.length + " orphaned documents from " + namespace) | |
} | |
return idsToRemove.length | |
} | |
} | |
// Iterate over the chunks in collection -- and check if on non-authatative shards | |
var chunks = config.chunks.find({ ns: namespace }) | |
// iterate over chunks -- only one shard should own each chunk | |
while (chunks.hasNext()) { | |
var chunk = chunks.next() | |
// query all non-authoritative shards | |
for (var shard in connections) { | |
if (shard != chunk.shard) { | |
// make connection to non-authoritative shard | |
var naCollection = connections[shard].getCollection(namespace) | |
// skip shards that have no data yet (missing shard key index) | |
if (!Shard.hasShardKeyIndex(naCollection)) { | |
print("Missing shard key index on shard " + shard) | |
continue | |
} | |
// gather documents that should not exist here | |
var orphanCount = naCollection.find({}, { _id: 1 }).min(chunk.min).max(chunk.max).itcount() | |
result.count += orphanCount | |
if (orphanCount > 0) { | |
chunk.orphanedOn = shard | |
chunk.orphanCount = orphanCount | |
result.badChunks.push(chunk) | |
print("Found " + orphanCount + " orphans for the following chunk on " + shard + | |
" which should only be on " + chunk.shard) | |
print("Chunk Info:") | |
printjson(chunk) | |
//print("Sample document from Chunk:") | |
//printjson(naCollection.find().min(chunk.min).max(chunk.max).limit(1).next()) | |
} | |
} | |
} | |
} | |
if (result.count > 0) { | |
print("-> " + result.count + " orphan(s) found in " + result.badChunks.length + | |
" chunks(s) in namespace [" + namespace + "]\n") | |
} | |
else { | |
print("-> No orphans found in [" + namespace + "]\n") | |
} | |
return result | |
}, | |
findAll: function(){ | |
var result = {} | |
var namespaces = Shard.namespaces() | |
for (var i = 0; i < namespaces.length; i++ ) { | |
namespace = namespaces[i] | |
result[namespace] = this.find(namespace) | |
} | |
} | |
} | |
print("*** Loaded orphanage.js ***") | |
print("*** This is dangerous -- we are not responsible for data loss ***") | |
print("*** Run only on a mongos connected to a sharded cluster ***") | |
print("") | |
print("usage:") | |
print("Balancer.stop() -- Stop the balancer") | |
print("Balancer.start() -- Start the balancer") | |
print("Orphans.find('db.collection') -- Find orphans in a given namespace") | |
print("Orphans.findAll() -- Find orphans in all namespaces") | |
print("") | |
print("To remove orphaned documents:") | |
print("var result = Orphans.find('db.collection')") | |
print("result.hasNext() -- Returns true if ns has more bad chunks") | |
print("result.next() -- Shows information about the next chunk") | |
print("result.remove() -- Removes the next chunk") | |
print("result.remove(true) -- Soft remove (sets _removed field to true)") | |
print("") |
@scotthernandez Can you suggest an alternative for M2.6? Thanks!
A more recent version of this unsupported script is available at https://github.com/mongodb/support-tools/tree/master/orphanage
For MongoDB 2.6+, there is the built-in cleanupOrphaned command.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is old, please delete