Skip to content

Instantly share code, notes, and snippets.

@gtato
Last active July 18, 2018 12:55
Show Gist options
  • Save gtato/c1afffcc76ef8489d6a318be5103f05e to your computer and use it in GitHub Desktop.
Save gtato/c1afffcc76ef8489d6a318be5103f05e to your computer and use it in GitHub Desktop.
Synchronize only some keys between independent Redis instances
const express = require('express')
const bodyParser = require('body-parser')
var redis = require('redis');
const app = express()
app.use(bodyParser.urlencoded());
app.use(bodyParser.json());
var ptrn = 'sess*'
var redisClients = {}
var secClients = {}
var sessions={}
app.post('/add_redis', function (req, res) {
var host = req.body.host
var port = req.body.port
addRedis(host, port)
res.send('OK')
})
function addRedis(host, port){
var rc = redis.createClient(port, host);
rc.on("error", function(err) {
onConnectionFail(rc, err);
});
rc.on('connect', function() {
onConnect(rc)
});
}
function onConnectionFail(rc, err){
console.error("Error connecting to redis");
rc.quit()
}
function onConnect(rc){
rc.config("SET", "notify-keyspace-events", "Kg\$");
rc.keys(ptrn, function(err, replies){
var keys = replies
rc.mget(replies, function(err, replies){
initalSync(rc, keys, replies)
});
})
}
function initalSync(rc, keys, vals){
console.log('initial sync for %s', rc.address);
var kval = []
for(sess in sessions){
if (keys.indexOf(sess) == -1){
kval.push(sess)
kval.push(sessions[sess])
}
}
if (kval.length > 0) rc.mset(kval);
kval = []
if (keys != null){
for(var i = 0; i < keys.length; i++){
if (!(keys[i] in sessions)){
kval.push(keys[i])
kval.push(vals[i])
sessions[keys[i]] = vals[i]
}
}
}
secClients[rc.address] = redis.createClient('redis://'+rc.address);
if (kval.length > 0) {
for(addr in redisClients){
if(addr == rc.address) continue
secClients[addr].mset(kval);
}
}
rc.psubscribe('__keyspace@0__:'+ptrn);
rc.on('pmessage', function(pattern, channel, message) {
onKeyUpdated(rc, pattern, channel, message)
});
redisClients[rc.address] = rc
console.log('%s connected successfully', rc.address);
}
function onKeyUpdated(rc, pattern, channel, message){
var key = channel.split('__:')[1]
if (message == 'set')
secClients[rc.address].get(key, function(err, reply){onSetKey(rc, key, err, reply)});
if (message == 'del')
onDelKey(rc, key)
}
function onSetKey(crc, key, err, reply){
var cval = reply
if(key in sessions && sessions[key] == cval) return
sessions[key]=cval
for(rcaddress in redisClients){
if (rcaddress == crc.address) continue
secClients[rcaddress].set(key, cval);
}
}
function onDelKey(crc, key){
if(!(key in sessions)) return;
delete sessions[key]
for(rcaddress in redisClients){
if (rcaddress == crc.address) continue
secClients[rcaddress].del(key);
}
}
app.get('/*', function (req, res) {
var sesstxt = 'Sessions:<br>'
for(k in sessions){
sesstxt += k + ' => ' + sessions[k]+'<br>'
}
var redistxt = 'Redises:<br>'
for(addr in redisClients)
redistxt += addr + '<br>'
res.send('<h1>Welcome to Redis syncer !</h1><br>'+redistxt+'<br>'+sesstxt)
})
port = 8006
app.listen(port, () => console.log('Syncer listening on port:' + port))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment