Last active
January 20, 2016 17:01
-
-
Save dirkmc/967e827779ca7ab8c55f to your computer and use it in GitHub Desktop.
Intercept requests from Amazon ELB and proxy them to a back end service depending on the IP indicated by the PROXY protocol
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Config looks like this: | |
/* | |
proxy: { | |
port: 5000, | |
// Indicates if there is a PROXY protocol enabled Load Balancer | |
// between us and the client | |
useProxyProtocol: true, | |
status: { | |
path: '/status.json', | |
// How many times can a host fail to respond before it is | |
// marked as down (and stops receiving requests) | |
unhealthyThreshold: 5, | |
// The interval between checking each host's status | |
interval: 1000 | |
}, | |
hosts: [ | |
'http://ws1.dasher.im:5000', | |
'http://ws2.dasher.im:5000', | |
'http://ws3.dasher.im:5000', | |
'http://ws4.dasher.im:5000' | |
] | |
}, | |
*/ | |
// iphash.js | |
module.exports = function(ip, seed) { | |
var hash = ip.reduce(function(r, num) { | |
r += parseInt(num, 10); | |
r %= 2147483648; | |
r += (r << 10) | |
r %= 2147483648; | |
r ^= r >> 6; | |
return r; | |
}, seed); | |
hash += hash << 3; | |
hash %= 2147483648; | |
hash ^= hash >> 11; | |
hash += hash << 15; | |
hash %= 2147483648; | |
return hash >>> 0; | |
} | |
// interceptor.js | |
var _ = require('underscore'); | |
var async = require('async'); | |
var config = require('config'); | |
var request = require('superagent'); | |
var conntrack = require('service-base').net.conntrack; | |
var logger = require('service-base').logger; | |
var errorHandler = require('errorhandler'); | |
var httpProxy = require('http-proxy'); | |
var iphash = require('./iphash'); | |
var env = process.env.NODE_ENV || 'development'; | |
var hosts = _.map(config.proxy.hosts, function(hostname) { | |
var host = httpProxy.createProxyServer({ target: hostname }); | |
host.healthy = false; | |
host.unhealthyCount = 0; | |
// If there's an error connecting to the upstream host, | |
// just log it and send a 500 | |
host.on('error', function(err, req, res) { | |
host.healthy = false; | |
logger.error('Error from host ' + hostname); | |
logger.error(err); | |
if(!res) return; | |
sendError(res, 'Could not connect from interceptor to web-socket service'); | |
}); | |
host.checkHealth = function(cb) { | |
var wasHealthy = host.healthy; | |
logger.debug('Checking health of ' + hostname + ' [' + wasHealthy + ']'); | |
request.get(hostname + config.proxy.status.path).timeout(config.proxy.status.interval).end(function(err, res) { | |
host.healthy = !err && res && res.statusCode >= 200 && res.statusCode < 400; | |
if(host.healthy) { | |
var wasDown = (host.unhealthyCount > config.proxy.status.unhealthyThreshold); | |
host.unhealthyCount = 0; | |
if(!wasHealthy) { | |
logger.info('Host ' + hostname + ' is up'); | |
if(wasDown) { | |
logger.info('Bringing ' + hostname + ' back into rotation'); | |
} | |
} | |
} else { | |
host.unhealthyCount++; | |
logger.warn('Could not connect to ' + hostname + '\n' + JSON.stringify(err)); | |
if(host.unhealthyCount == config.proxy.status.unhealthyThreshold + 1) { | |
logger.warn('Marking ' + hostname + ' as down. Directing incoming traffic to other hosts\n'); | |
} | |
} | |
logger.debug(hostname + ' health: ' + host.healthy); | |
cb(); | |
}); | |
}; | |
return host; | |
}); | |
function checkHostsHealth(callback) { | |
async.each(hosts, function(host, cb) { | |
host.checkHealth(cb); | |
}, function(err, data) { | |
callback && callback(); | |
setTimeout(checkHostsHealth, config.proxy.status.interval); | |
}); | |
} | |
var seed = 757146900; | |
function getHash(req) { | |
var ip = getIp(req); | |
return iphash((ip || '').split(/\./g), seed); | |
} | |
function getIp(req) { | |
return req.connection.remoteAddress; | |
} | |
function getHost(req) { | |
var healthyHosts = _.filter(hosts, function(h) { | |
return h.unhealthyCount <= config.proxy.status.unhealthyThreshold; | |
}); | |
return healthyHosts[getHash(req) % healthyHosts.length]; | |
} | |
function sendError(res, msg) { | |
if(!res || !res.writeHead) return; | |
res.writeHead(500, { | |
'Content-Type': 'text/plain' | |
}); | |
res.end(msg); | |
} | |
function sendNoHealthyHosts(res) { | |
sendError(res, '0 / ' + hosts.length + ' healthy hosts'); | |
}; | |
// Note: proxywrap just takes care of the PROXY protocol that Amazon ELB uses | |
// so that it looks like the request IP is from the client rather than the | |
// load balancer | |
var http = require('http'); | |
if(config.proxy.useProxyProtocol) { | |
http = require('proxywrap').proxy(http); | |
} | |
var server = http.createServer(function(req, res) { | |
var host = getHost(req); | |
if(!host) { | |
return sendNoHealthyHosts(res); | |
} | |
logger.info(req.method, req.url); | |
logger.info('sending request from ' + getIp(req) + ' to', host.options.target.href || host.options.target); | |
host.web(req, res); | |
}); | |
// Listen for the `upgrade` event and proxy the | |
// WebSocket requests as well. | |
server.on('upgrade', function (req, socket, head) { | |
var host = getHost(req); | |
if(!host) { | |
return sendNoHealthyHosts(res); | |
} | |
logger.info(req.method, req.url); | |
logger.info('sending UPGRADE from ' + getIp(req) + ' to', host.options.target.href || host.options.target); | |
host.ws(req, socket, head); | |
}); | |
var connections = null; | |
module.exports = { | |
listen: function(callback) { | |
// Before starting check the health of dependent hosts | |
checkHostsHealth(function() { | |
var msg = 'interceptor listening on ' + config.port; | |
msg += ' with ' + hosts.length + ' hosts:\n' + config.proxy.hosts.join('\n'); | |
logger.info(msg); | |
server.listen(config.port, callback); | |
connections = conntrack.track(server); | |
}); | |
return server; | |
}, | |
port: config.port, | |
end: function(callback) { | |
logger.info('closing interceptor'); | |
server.close(function() { | |
logger.info('closed interceptor'); | |
if (callback) callback(); | |
}); | |
connections.closeConnections(2000); | |
} | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment