Created
February 19, 2015 14:53
-
-
Save simontabor/a72dada672fe240ad00d to your computer and use it in GitHub Desktop.
Redis Cluster slot balancing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var async = require('async'); | |
var Redis = require('redis'); | |
/* | |
* Rebalance a Redis cluster | |
* | |
* 1. Keeps slot allocations in blocks to maintain tidy configuration. | |
* 2. Moves slots to the coldest node from the hottest node, so the cluster stays healthy whilst rebalancing (`liveBalance`) | |
*/ | |
var Rebalance = function(redisConf, removeNodes, liveBalance) { | |
var self = this; | |
self.numSlots = 16384; | |
self.redisConn = Redis.createClient(redisConf.port, redisConf.host); | |
self.liveBalance = liveBalance; | |
// we want to remove all slots off of these nodes (empty to spread across all masters) | |
self.removeNodes = removeNodes || []; | |
}; | |
Rebalance.prototype.run = function(cb) { | |
var self = this; | |
self.getNodes(function(err) { | |
if (err) return cb(err); | |
var jobs = self.calculateMoves(); | |
var slotMoves; | |
if (self.liveBalance) { | |
// ordered by number of slots for even rebalancing (doesn't work) | |
slotMoves = self.getOrderedSlotMoves(jobs); | |
} else { | |
// batched into node groups (works) | |
slotMoves = self.getSlotMoves(jobs); | |
} | |
console.log(slotMoves); | |
async.eachSeries(slotMoves, self.moveSlot.bind(self), cb); | |
}); | |
}; | |
// list nodes in the cluster and connect to them | |
Rebalance.prototype.getNodes = function(cb) { | |
var self = this; | |
self.redisConn.send_command('cluster', [ 'nodes' ], function(err, resp) { | |
if (err) return cb(err); | |
self.nodes = {}; | |
resp.split('\n').forEach(function(n){ | |
var s = n.split(' '); | |
// don't care about slaves, only master | |
if (!s[2] || s[2].indexOf('master') === -1) return; | |
// [ '0-400', '6546-7423', '9642' ] => [ [ 0, 400 ], [ 6546, 7423 ], [ 9642, 9642 ] ] | |
var slots = s.slice(8).map(function(s) { | |
var split = s.split('-').map(Number); | |
if (split.length === 1) return [ split[0], split[0] ]; | |
return [ split[0], split[1] ]; | |
}); | |
self.nodes[s[1]] = { | |
name: s[1], | |
hash: s[0], | |
client: Redis.createClient(s[1].split(':')[1], s[1].split(':')[0]), | |
slots: slots, | |
numSlots: slots.reduce(function(a, b) { return a + b[1] - b[0] + 1; }, 0) | |
}; | |
}); | |
cb(null, self.nodes); | |
}); | |
}; | |
// calculate what slots we're going to be moving | |
Rebalance.prototype.calculateMoves = function() { | |
var self = this; | |
// get the nodenames in the cluster which aren't going to be removed | |
var availableNodes = Object.keys(self.nodes).filter(function(n) { | |
return self.removeNodes.indexOf(n) === -1; | |
}); | |
var slotsPerNode = self.numSlots / availableNodes.length; | |
// these are the block slot ranges that we want to end up with | |
var slotRanges = []; | |
for (var i = 0; i < availableNodes.length; i++) { | |
slotRanges.push( [ Math.round(slotsPerNode * i), Math.round(slotsPerNode * (i + 1)) - 1 ] ); | |
} | |
// what node currently owns what slots? | |
var slotMap = new Array(self.numSlots); | |
for (var i in self.nodes) { | |
var n = self.nodes[i]; | |
for (var j = 0; j < n.slots.length; j++) { | |
var sl = n.slots[j]; | |
for (var h = sl[0]; h <= sl[1]; h++) { | |
slotMap[h] = i; | |
} | |
} | |
} | |
// nodes we've already selected to move a slot range to | |
var selectedNodes = []; | |
// jobs of moving slots from one node to another | |
var jobs = []; | |
for (var i = 0; i < slotRanges.length; i++) { | |
var slotRange = slotRanges[i]; | |
var nodeSlots = self.getSlotRangeNodes(slotRange, slotMap); | |
var mostSlots = self.getNodeWithMostSlots(nodeSlots, selectedNodes); | |
// we've selected this node now, stop if from being selected again | |
selectedNodes.push(mostSlots); | |
for (var j in nodeSlots) { | |
// we dont want to move slots between the same node | |
if (j === mostSlots) continue; | |
// no slots to move between these nodes | |
if (!nodeSlots[j].length) continue; | |
jobs.push({ | |
from: j, | |
to: mostSlots, | |
slots: nodeSlots[j] | |
}); | |
} | |
} | |
return jobs; | |
}; | |
// get what nodes own what slots within a slot range | |
Rebalance.prototype.getSlotRangeNodes = function(range, slotMap) { | |
var self = this; | |
var nodes = {}; | |
// initialise an empty array for each of the nodes in the cluster | |
for (var i in self.nodes) nodes[i] = []; | |
for (var i = range[0]; i <= range[1]; i++) { | |
// get the host which currently owns this slot | |
var h = slotMap[i]; | |
if (!h) throw new Error('Unallocated slot ' + i + ', I\'m not sure what to do'); | |
nodes[h].push(i); | |
} | |
return nodes; | |
}; | |
// sort the slotRangeNodes by the number of slots that they own and select the one with the most | |
Rebalance.prototype.getNodeWithMostSlots = function(nodeSlots, exclude) { | |
var self = this; | |
return Object.keys(nodeSlots).filter(function(a) { | |
// exclude nodes we're removing, and ones that we've already selected before | |
return self.removeNodes.indexOf(a) === -1 && exclude.indexOf(a) === -1; | |
}).sort(function(a, b) { | |
return nodeSlots[b].length - nodeSlots[a].length; | |
})[0]; | |
}; | |
// returns the slot movement jobs ordered so we keep the cluster balanced as we go | |
Rebalance.prototype.getOrderedSlotMoves = function(jobs) { | |
var self = this; | |
var slotMoves = []; | |
var allTo = {}; | |
for (var i = 0; i < jobs.length; i++) { | |
var job = jobs[i]; | |
if (!allTo[job.to]) allTo[job.to] = []; | |
for (var j = 0; j < job.slots.length; j++) { | |
allTo[job.to].push({ | |
from: job.from, | |
slot: job.slots[j] | |
}); | |
} | |
} | |
var findSlotToMove = function() { | |
var node = Object.keys(allTo).filter(function(a) { | |
// ensure we still have slots to move | |
return allTo[a].length; | |
}).sort(function(a, b) { | |
// find node with least total slots | |
return self.nodes[a].numSlots - self.nodes[b].numSlots; | |
})[0]; | |
// exit recursion | |
if (!node) return; | |
var nodeJob = allTo[node].sort(function(a, b) { | |
// sort the nodeJobs to find the node with the MOST slots to move away from | |
return self.nodes[a.from].numSlots - self.nodes[b.from].numSlots | |
}).pop(); | |
// update slot numbers so next sort will be up to date | |
self.nodes[node].numSlots++; | |
self.nodes[nodeJob.from].numSlots--; | |
slotMoves.push({ | |
from: nodeJob.from, | |
to: node, | |
slot: nodeJob.slot | |
}); | |
// call recursively until !node | |
findSlotToMove(); | |
}; | |
findSlotToMove(); | |
return slotMoves; | |
}; | |
// returns the slot movement jobs, un-ordered (therefore batched into from->to blocks) | |
Rebalance.prototype.getSlotMoves = function(jobs) { | |
var self = this; | |
var slotMoves = []; | |
for (var i = 0; i < jobs.length; i++) { | |
var job = jobs[i]; | |
for (var j = 0; j < job.slots.length; j++) { | |
slotMoves.push({ | |
from: job.from, | |
to: job.to, | |
slot: job.slots[j] | |
}); | |
} | |
} | |
return slotMoves; | |
}; | |
// move a slot from one node to another | |
Rebalance.prototype.moveSlot = function(job, cb) { | |
var self = this; | |
var from = job.from; | |
var to = job.to; | |
var slot = job.slot; | |
var target = self.nodes[to]; | |
var source = self.nodes[from]; | |
console.log('MOVING SLOT ' + slot + ' from ' + from + ' to ' + to); | |
// tell the relevant nodes that we're moving | |
async.parallel([ | |
function(done) { target.client.send_command('cluster', [ 'setslot', slot, 'importing', source.hash ], done); }, | |
function(done) { source.client.send_command('cluster', [ 'setslot', slot, 'migrating', target.hash ], done); }, | |
], function(err) { | |
if (err) return cb(err); | |
self.moveKeysInSlot(job, function(err) { | |
if (err) return cb(err); | |
// we've moved all keys out of the slot! | |
// let's tell the nodes we've moved | |
async.each(Object.keys(self.nodes), function(n, done) { | |
self.nodes[n].client.send_command('cluster', [ 'setslot', slot, 'node', target.hash ], done); | |
}, cb); | |
}); | |
}); | |
}; | |
// move all the keys in a slot, callback when all keys moved | |
Rebalance.prototype.moveKeysInSlot = function(job, cb) { | |
var self = this; | |
var source = self.nodes[job.from]; | |
source.client.send_command('cluster', [ 'getkeysinslot', job.slot, 20 ], function(err, keys) { | |
if (err) return cb(err); | |
// done! | |
if (!keys.length) return cb(); | |
async.eachLimit(keys, 1, function(key, done) { | |
console.log('Moving ' + key + ' from ' + job.from + ' to ' + job.to); | |
source.client.send_command('migrate', [ job.to.split(':')[0], job.to.split(':')[1], key, 0, 15000 ], done); | |
}, function(err) { | |
if (err) return cb(err); | |
// call again to do the next batch | |
self.moveKeysInSlot(job, cb); | |
}); | |
}); | |
}; | |
// // block rebalance, removing a node from the cluster (works) | |
var r = new Rebalance({ | |
port: 6380, | |
host: '11.11.12.70' | |
}, [ '11.11.12.71:6380' ], false); | |
// // block rebalance, sharing evenly across all nodes in the cluster (works) | |
// var r = new Rebalance({ | |
// port: 6380, | |
// host: '11.11.12.70' | |
// }, [ ], false); | |
// // live/dynamic rebalance, removing a node from the cluster (doesn't work) | |
// var r = new Rebalance({ | |
// port: 6380, | |
// host: '11.11.12.70' | |
// }, [ '11.11.12.71:6380' ], true); | |
// // live/dynamic rebalance, sharing evenly across all nodes in the cluster (doesn't work) | |
// var r = new Rebalance({ | |
// port: 6380, | |
// host: '11.11.12.70' | |
// }, [ ], true); | |
r.run(function(err) { | |
if (err) console.error(err, 'Rebalance FAILED!'); | |
console.log('Done'); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment