Created
November 15, 2012 14:38
-
-
Save hackable/4078937 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict'; | |
var fs = require('fs'), | |
util = require('util'), | |
http = require('http'), | |
https = require('https'), | |
httpProxy = require('http-proxy'), | |
cache = require('./cache'), | |
memoryMonitor = require('./memorymonitor'); | |
var rootDir = fs.realpathSync(__dirname + '/../'); | |
var logType = { | |
log: 1, | |
accessLog: 2 | |
}; | |
var log = function (msg, type) { | |
// Send the log data to the master | |
var message = {}; | |
try { | |
message = { | |
type: (type === undefined) ? logType.log : type, | |
from: process.pid, | |
data: msg | |
}; | |
process.send(message); | |
} catch (err) { | |
// Cannot write on the master channel (worker is committing a suicide) | |
// (from the memorymonitor), writing the log directly. | |
if (message.type === 1) { | |
util.log('(worker #' + message.from + ') ' + message.data); | |
} | |
} | |
}; | |
var debug = function (debugMode) { | |
return function (msg) { | |
if (debugMode !== true) { | |
return; | |
} | |
log(msg, logType.log); | |
}; | |
}; | |
// Ignore SIGUSR | |
process.on('SIGUSR1', function () {}); | |
process.on('SIGUSR2', function () {}); | |
function Worker(config) { | |
if (!(this instanceof Worker)) { | |
return new Worker(config); | |
} | |
debug = debug(config.server.debug); | |
this.cache = cache(config, { | |
logHandler: log, | |
debugHandler: debug | |
}); | |
this.runServer(config.server); | |
} | |
Worker.prototype.runServer = function (config) { | |
httpProxy.setMaxSockets(config.maxSockets); | |
var proxyErrorHandler = function (err, req, res) { | |
if (err.code === 'ECONNREFUSED' || | |
err.code === 'ETIMEDOUT' || | |
req.error !== undefined) { | |
// This backend is dead | |
var backendId = req.meta.backendId; | |
this.cache.markDeadBackend(req.meta); | |
if (req.error) { | |
err = req.error; | |
// Clearing the error | |
delete req.error; | |
} | |
log(req.headers.host + ': backend #' + backendId + ' is dead (' + JSON.stringify(err) + ') while handling request for ' + req.url); | |
} else { | |
log(req.headers.host + ': backend #' + req.meta.backendId + ' reported an error (' + JSON.stringify(err) + ') while handling request for ' + req.url); | |
} | |
req.retries = (req.retries === undefined) ? 0 : req.retries + 1; | |
if (!res.connection || res.connection.destroyed === true) { | |
// FIXME: When there is a TCP timeout, the socket of the Response | |
// object is closed. Not possible to return a result after a retry. | |
// BugID:5654 | |
log(req.headers.host + ': Response socket already closed, aborting.'); | |
try { | |
return errorMessage(res, 'Cannot retry on error', 502); | |
} catch (err) { | |
// Even if the client socket is closed, we return an error | |
// to force calling a res.end(). We do it safely in case there | |
// is an error | |
log(req.headers.host + ': Cannot end the request properly (' + err + ').'); | |
} | |
} | |
if (req.retries >= config.retryOnError) { | |
if (config.retryOnError) { | |
log(req.headers.host + ': Retry limit reached (' + config.retryOnError + '), aborting.'); | |
return errorMessage(res, 'Reached max retries limit', 502); | |
} | |
return errorMessage(res, 'Retry on error is disabled', 502); | |
} | |
req.emit('retry'); | |
}.bind(this); | |
var startHandler = function (req, res) { | |
var remoteAddr = getRemoteAddress(req); | |
// TCP timeout to 30 sec | |
req.connection.setTimeout(config.tcpTimeout * 1000); | |
// Make sure the listener won't be set again on retry | |
if (req.connection.listeners('timeout').length < 2) { | |
req.connection.once('timeout', function () { | |
req.error = 'TCP timeout'; | |
}); | |
} | |
// Set forwarded headers | |
if (remoteAddr === null) { | |
return errorMessage(res, 'Cannot read the remote address.'); | |
} | |
if (remoteAddr.slice(0, 2) !== '::') { | |
remoteAddr = '::ffff:' + remoteAddr; | |
} | |
// Play nicely when behind other proxies | |
if (req.headers['x-forwarded-for'] === undefined) { | |
req.headers['x-forwarded-for'] = remoteAddr; | |
} | |
if (req.headers['x-real-ip'] === undefined) { | |
req.headers['x-real-ip'] = remoteAddr; | |
} | |
if (req.headers['x-forwarded-protocol'] === undefined) { | |
req.headers['x-forwarded-protocol'] = req.connection.pair ? 'https' : 'http'; | |
} | |
if (req.headers['x-forwarded-proto'] === undefined) { | |
req.headers['x-forwarded-proto'] = req.connection.pair ? 'https' : 'http'; | |
} | |
if (req.headers['x-forwarded-port'] === undefined) { | |
// FIXME: replace by the real port instead of hardcoding it | |
req.headers['x-forwarded-port'] = req.connection.pair ? '443' : '80'; | |
} | |
}; | |
var getRemoteAddress = function (req) { | |
if (req.connection === undefined) { | |
return null; | |
} | |
if (req.connection.remoteAddress) { | |
return req.connection.remoteAddress; | |
} | |
if (req.connection.socket && req.connection.socket.remoteAddress) { | |
return req.connection.socket.remoteAddress; | |
} | |
return null; | |
}; | |
var errorMessage = function (res, message, code) { | |
// Flag the Response to know that it's an internal error message | |
res.errorMessage = true; | |
if (message === undefined) { | |
message = ''; | |
} | |
code = isNaN(code) ? 400 : parseInt(code, 10); | |
var staticPath = function (name) { | |
return rootDir + '/static/error_' + name + '.html'; | |
}; | |
var serveFile = function (filePath) { | |
var stream = fs.createReadStream(filePath); | |
var headers = { | |
'content-type': 'text/html', | |
'cache-control': 'no-cache', | |
'pragma': 'no-cache', | |
'expires': '-1' | |
}; | |
if (res.debug === true) { | |
headers['x-debug-error'] = message; | |
} | |
res.writeHead(code, headers); | |
stream.on('data', function (data) { | |
res.write(data); | |
}); | |
stream.on('error', function () { | |
res.end(); | |
}); | |
stream.on('end', function () { | |
res.end(); | |
}); | |
}; | |
var serveText = function () { | |
var headers = { | |
'content-length': message.length, | |
'content-type': 'text/plain', | |
'cache-control': 'no-cache', | |
'pragma': 'no-cache', | |
'expires': '-1' | |
}; | |
if (res.debug === true) { | |
headers['x-debug-error'] = message; | |
} | |
res.writeHead(code, headers); | |
res.write(message); | |
res.end(); | |
}; | |
if (code === 200) { | |
// If code is 200, let's just serve the text message since | |
// it's not an error... | |
return serveText(message); | |
} | |
var errorPage = staticPath(code); | |
fs.exists(errorPage, function (exists) { | |
if (exists === true) { | |
return serveFile(errorPage); | |
} | |
errorPage = staticPath('default'); | |
fs.exists(errorPage, function (exists) { | |
if (exists === true) { | |
return serveFile(errorPage); | |
} | |
return serveText(); | |
}); | |
}); | |
}; | |
var getDomainName = function (hostname) { | |
var idx = hostname.lastIndexOf('.'); | |
if (idx < 0) { | |
return hostname; | |
} | |
idx = hostname.lastIndexOf('.', idx - 1); | |
if (idx < 0) { | |
return hostname; | |
} | |
return hostname.substr(idx); | |
}; | |
var middleware = function(res, callback) { | |
var _write = res.write; | |
res.write = function(data) { | |
_write.call(res, data); | |
if (callback) callback(data); | |
} | |
}; | |
var httpRequestHandler = function (req, res) { | |
res.timer = { | |
start: Date.now() | |
}; | |
// Patch the response object | |
(function () { | |
// Enable debug? | |
res.debug = (req.headers['x-debug'] !== undefined); | |
// Patch the res.writeHead to detect backend HTTP errors and handle | |
// debug headers | |
var resWriteHead = res.writeHead; | |
res.writeHead = function (statusCode) { | |
res.timer.end = Date.now(); | |
if (req.meta === undefined) { | |
return resWriteHead.apply(res, arguments); | |
} | |
var markDeadBackend = function () { | |
var backendId = req.meta.backendId; | |
this.cache.markDeadBackend(req.meta); | |
log(req.headers.host + ': backend #' + backendId + ' is dead (HTTP error code ' + | |
statusCode + ') while handling request for ' + req.url); | |
}.bind(this); | |
// If the HTTP status code is 5xx, let's mark the backend as dead | |
// (Except for error 500, which can happen for programming errors, | |
// and mark backend as dead when it's not dead at all) | |
if ((statusCode >= 501 && statusCode < 600) && res.errorMessage !== true) { | |
if (statusCode === 503) { | |
var headers = arguments[arguments.length - 1]; | |
if (typeof headers === 'object') { | |
// Let's lookup the headers to find a "Retry-After" | |
// In this case, this is a legit maintenance mode | |
if (headers['retry-after'] === undefined) { | |
markDeadBackend(); | |
} | |
} | |
} else { | |
// For all other cases, mark the backend as dead | |
markDeadBackend(); | |
} | |
} | |
// If debug is enabled, let's inject the debug headers | |
if (res.debug === true) { | |
res.setHeader('x-debug-backend-url', req.meta.backendUrl); | |
res.setHeader('x-debug-backend-id', req.meta.backendId); | |
res.setHeader('x-debug-vhost', req.meta.virtualHost); | |
res.setHeader('x-debug-frontend-key', req.meta.frontend); | |
res.setHeader('x-debug-time-total', (res.timer.end - res.timer.start)); | |
res.setHeader('x-debug-time-backend', (res.timer.end - res.timer.startBackend)); | |
} | |
return resWriteHead.apply(res, arguments); | |
}.bind(this); | |
// Patch res.end to log the response stats | |
var resEnd = res.end; | |
res.end = function () { | |
var headers = res._header; | |
var backend_header = headers.split("\n"); | |
resEnd.apply(res, arguments); | |
// Number of bytes written on the client socket | |
var socketBytesWritten = req.connection ? req.connection._bytesDispatched : 0; | |
if (req.meta === undefined || | |
req.headers['x-real-ip'] === undefined) { | |
return; // Nothing to log | |
} | |
res.timer.end = Date.now(); | |
// Log the request | |
log({ | |
remoteAddr: req.headers['x-real-ip'], | |
currentTime: res.timer.start, | |
totalTimeSpent: (res.timer.end - res.timer.start), | |
backendTimeSpent: (res.timer.end - res.timer.startBackend), | |
method: req.method, | |
url: req.url, | |
httpVersion: req.httpVersion, | |
statusCode: res.statusCode, | |
socketBytesWritten: socketBytesWritten, | |
referer: req.headers.referer, | |
userAgent: req.headers['user-agent'], | |
virtualHost: req.meta.virtualHost | |
}, logType.accessLog); | |
}; | |
}.bind(this)()); | |
// Proxy the HTTP request | |
var proxyRequest = function () { | |
var buffer = httpProxy.buffer(req); | |
this.cache.getBackendFromHostHeader(req.headers.host, function (err, code, backend) { | |
if (err) { | |
return errorMessage(res, err, code); | |
} | |
req.meta = { | |
backendId: backend.id, | |
backendLen: backend.len, | |
frontend: backend.frontend, | |
virtualHost: backend.virtualHost, | |
backendUrl: backend.href | |
}; | |
// Proxy the request to the backend | |
res.timer.startBackend = Date.now(); | |
var proxy = new httpProxy.HttpProxy({ | |
target: { | |
host: backend.hostname, | |
port: backend.port | |
}, | |
enable: { | |
xforward: false | |
} | |
}); | |
proxy.on('proxyError', proxyErrorHandler); | |
proxy.on('start', startHandler); | |
proxy.proxyRequest(req, res, buffer); | |
middleware(res, function(data) { | |
console.log(data.toString("utf8")); | |
}); | |
}); | |
}.bind(this); | |
if (config.retryOnError) { | |
req.on('retry', function () { | |
log('Retrying on ' + req.headers.host); | |
proxyRequest(); | |
}); | |
} | |
proxyRequest(); | |
}.bind(this); | |
var wsRequestHandler = function (req, socket, head) { | |
var buffer = httpProxy.buffer(socket); | |
this.cache.getBackendFromHostHeader(req.headers.host, function (err, code, backend) { | |
var proxy; | |
if (err) { | |
log('proxyWebSocketRequest: ' + err); | |
return; | |
} | |
// Proxy the WebSocket request to the backend | |
proxy = new httpProxy.HttpProxy({ | |
target: { | |
host: backend.hostname, | |
port: backend.port | |
} | |
}); | |
proxy.proxyWebSocketRequest(req, socket, head, buffer); | |
}); | |
}.bind(this); | |
var monitor = memoryMonitor({ | |
logHandler: log | |
}); | |
// The handler configure the client socket for every new connection | |
var tcpConnectionHandler = function (connection) { | |
var remoteAddress = connection.remoteAddress, | |
remotePort = connection.remotePort, | |
start = Date.now(); | |
var getSocketInfo = function () { | |
return JSON.stringify({ | |
remoteAddress: remoteAddress, | |
remotePort: remotePort, | |
bytesWritten: connection.bytesWritten, | |
bytesRead: connection.bytesRead, | |
elapsed: (Date.now() - start) / 1000 | |
}); | |
}; | |
connection.setKeepAlive(false); | |
connection.setTimeout(config.tcpTimeout * 1000); | |
connection.on('error', function (error) { | |
log('TCP error from ' + getSocketInfo() + '; Error: ' + JSON.stringify(error)); | |
}); | |
connection.on('timeout', function () { | |
log('TCP timeout from ' + getSocketInfo()); | |
connection.destroy(); | |
}); | |
}; | |
if (config.httpKeepAlive !== true) { | |
// Disable the http Agent of the http-proxy library so we force | |
// the proxy to close the connection after each request to the backend | |
httpProxy._getAgent = function () { | |
return false; | |
}; | |
} | |
// Ipv4 | |
var ipv4HttpServer = http.createServer(httpRequestHandler); | |
ipv4HttpServer.on('connection', tcpConnectionHandler); | |
ipv4HttpServer.on('upgrade', wsRequestHandler); | |
ipv4HttpServer.listen(config.port); | |
// Ipv6 | |
var ipv6HttpServer = http.createServer(httpRequestHandler); | |
ipv6HttpServer.on('connection', tcpConnectionHandler); | |
ipv6HttpServer.on('upgrade', wsRequestHandler); | |
ipv6HttpServer.listen(config.port, '::1'); | |
monitor.addServer(ipv4HttpServer); | |
monitor.addServer(ipv6HttpServer); | |
if (config.https) { | |
var options = config.https; | |
options.key = fs.readFileSync(options.key, 'utf8'); | |
options.cert = fs.readFileSync(options.cert, 'utf8'); | |
var ipv4HttpsServer = https.createServer(options, httpRequestHandler); | |
ipv4HttpsServer.on('connection', tcpConnectionHandler); | |
ipv4HttpsServer.on('upgrade', wsRequestHandler); | |
ipv4HttpsServer.listen(config.https.port); | |
var ipv6HttpsServer = https.createServer(options, httpRequestHandler); | |
ipv6HttpsServer.on('connection', tcpConnectionHandler); | |
ipv6HttpsServer.on('upgrade', wsRequestHandler); | |
ipv6HttpsServer.listen(config.https.port, '::1'); | |
monitor.addServer(ipv4HttpsServer); | |
monitor.addServer(ipv6HttpsServer); | |
} | |
}; | |
module.exports = Worker; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment