Skip to content

Instantly share code, notes, and snippets.

@dirkmc
Created October 29, 2014 00:38
Show Gist options
  • Save dirkmc/3d0ee2d7ae637815586a to your computer and use it in GitHub Desktop.
Save dirkmc/3d0ee2d7ae637815586a to your computer and use it in GitHub Desktop.
var _ = require('underscore');
var async = require('async');
var config = require('config');
var request = require('superagent');
var conntrack = require('service-base').net.conntrack;
var logger = require('service-base').logger;
var errorHandler = require('errorhandler');
var httpProxy = require('http-proxy');
var iphash = require('./iphash');
var env = process.env.NODE_ENV || 'development';
var hosts = _.map(config.proxy.hosts, function(hostname) {
var host = httpProxy.createProxyServer({ target: hostname });
host.healthy = false;
host.unhealthyCount = 0;
// If there's an error connecting to the upstream host,
// just log it and send a 500
host.on('error', function(err, req, res) {
host.healthy = false;
logger.error('Error from host ' + hostname);
logger.error(err);
if(!res) return;
sendError(res, 'Could not connect from interceptor to web-socket service');
});
host.checkHealth = function(cb) {
var wasHealthy = host.healthy;
logger.debug('Checking health of ' + hostname + ' [' + wasHealthy + ']');
request.get(hostname + config.proxy.status.path).timeout(config.proxy.status.interval).end(function(err, res) {
host.healthy = !err && res && res.statusCode >= 200 && res.statusCode < 400;
if(host.healthy) {
var wasDown = (host.unhealthyCount > config.proxy.status.unhealthyThreshold);
host.unhealthyCount = 0;
if(!wasHealthy) {
logger.info('Host ' + hostname + ' is up');
if(wasDown) {
logger.info('Bringing ' + hostname + ' back into rotation');
}
}
} else {
host.unhealthyCount++;
logger.warn('Could not connect to ' + hostname + '\n' + JSON.stringify(err));
if(host.unhealthyCount == config.proxy.status.unhealthyThreshold + 1) {
logger.warn('Marking ' + hostname + ' as down. Directing incoming traffic to other hosts\n');
}
}
logger.debug(hostname + ' health: ' + host.healthy);
cb();
});
};
return host;
});
function checkHostsHealth(callback) {
async.each(hosts, function(host, cb) {
host.checkHealth(cb);
}, function(err, data) {
callback && callback();
setTimeout(checkHostsHealth, config.proxy.status.interval);
});
}
var seed = 757146900;
function getHash(req) {
var ip = getIp(req);
return iphash((ip || '').split(/\./g), seed);
}
function getIp(req) {
return req.connection.remoteAddress;
}
function getHost(req) {
var healthyHosts = _.filter(hosts, function(h) {
return h.unhealthyCount <= config.proxy.status.unhealthyThreshold;
});
return healthyHosts[getHash(req) % healthyHosts.length];
}
function sendError(res, msg) {
res.writeHead(500, {
'Content-Type': 'text/plain'
});
res.end(msg);
}
function sendNoHealthyHosts(res) {
sendError(res, '0 / ' + hosts.length + ' healthy hosts');
};
// Note: proxywrap just takes care of the PROXY protocol that Amazon ELB uses
// so that it looks like the request IP is from the client rather than the
// load balancer
var http = require('http');
if(config.proxy.useProxyProtocol) {
http = require('proxywrap').proxy(http);
}
var server = http.createServer(function(req, res) {
var host = getHost(req);
if(!host) {
return sendNoHealthyHosts(res);
}
logger.info(req.method, req.url);
logger.info('sending request from ' + getIp(req) + ' to', host.options.target.href || host.options.target);
host.web(req, res);
});
// Listen for the `upgrade` event and proxy the
// WebSocket requests as well.
server.on('upgrade', function (req, socket, head) {
var host = getHost(req);
if(!host) {
return sendNoHealthyHosts(res);
}
logger.info(req.method, req.url);
logger.info('sending UPGRADE from ' + getIp(req) + ' to', host.options.target.href || host.options.target);
host.ws(req, socket, head);
});
var connections = null;
module.exports = {
listen: function(callback) {
// Before starting check the health of dependent hosts
checkHostsHealth(function() {
var msg = 'interceptor listening on ' + config.port;
msg += ' with ' + hosts.length + ' hosts:\n' + config.proxy.hosts.join('\n');
logger.info(msg);
server.listen(config.port, callback);
connections = conntrack.track(server);
});
return server;
},
port: config.port,
end: function(callback) {
logger.info('closing interceptor');
server.close(function() {
logger.info('closed interceptor');
if (callback) callback();
});
connections.closeConnections(2000);
}
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment