Last active
May 26, 2022 16:44
-
-
Save Zaggen/bbec82153f45ec60a136dca8e9ed65e7 to your computer and use it in GitHub Desktop.
Node.js load balancer with
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Author: Zaggen - 2017 | |
* version: 0.1 | |
* github: https://github.com/Zaggen | |
* Free to use and modify | |
* */ | |
const httpProxy = require('http-proxy') | |
const http = require('http') | |
const proxy = httpProxy.createProxyServer({}) | |
const Promise = require('bluebird') | |
// REDIS | |
const redis = require('redis') | |
const redisConfig = require('./config/local.js').redisConfig | |
const redisClient = redis.createClient(redisConfig) | |
const port = 1337 // Same as the one configured on nginx | |
Promise.promisifyAll(redis.RedisClient.prototype) | |
// Queue system vars/consts | |
const MAX_ATTEMPS = 5 | |
const REGISTRY_CHECK_TIME = 10000 | |
const MAX_QUEUE_SIZE = 20 | |
const queue = [] | |
let serverIndex = 0 | |
let serversList | |
(async function init() { | |
serversList = await redisClient.lrangeAsync('server-registry', 0, 100) | |
await validateRegistryAsync(serversList) | |
let lastRegistryCheck = (new Date()).getTime() | |
const server = http.createServer(async function(req, res) { | |
handleRequest(req, res, MAX_ATTEMPS) | |
}) | |
async function handleRequest(req, res, retryAttemptsLeft) { | |
if(validationLock.isActive) | |
await validationLock.waitForRelease() | |
req.retryAttemptsLeft = retryAttemptsLeft | |
// If 60s has passed since the last request, we validate our registry | |
if(((new Date()).getTime() - lastRegistryCheck) > REGISTRY_CHECK_TIME){ | |
validationLock.acquire() | |
serversList = await validateRegistryAsync(await redisClient.lrangeAsync('server-registry', 0, 100)) | |
lastRegistryCheck = (new Date()).getTime() | |
validationLock.release() | |
} | |
// If we have some servers, we process the request | |
if(serversList.length > 0){ | |
serverIndex = (serverIndex + 1) % serversList.length | |
const target = serversList[serverIndex] | |
// console.log('balancing request to: ', target); | |
proxy.web(req, res, {target}) | |
} | |
else { | |
queueRequest(req, res) | |
} | |
} | |
const validationLock = { | |
_isActive: false, | |
get isActive(){ return this._isActive}, | |
set isActive(state){ | |
this._isActive = state | |
if(state === false) | |
this._notifyRelease() | |
}, | |
acquire(){ | |
this.isActive = true | |
}, | |
release(){ | |
this.isActive = false | |
}, | |
async waitForRelease(){ | |
return new Promise((resolve)=> { | |
this._onRelease(resolve) | |
}) | |
}, | |
_notifyRelease(){ | |
let cb | |
while(cb = this._listeners.shift()){ | |
cb() | |
} | |
}, | |
_listeners: [], | |
_onRelease(cb){ | |
this._listeners.push(cb) | |
} | |
} | |
function queueRequest(req, res) { | |
const {retryAttemptsLeft} = req | |
// console.log(`queueRequest -> retryAttemptsLeft: ${retryAttemptsLeft}`) | |
if(retryAttemptsLeft > 0){ | |
if(queue.length >= MAX_QUEUE_SIZE) { | |
const oldestRequest = queue.shift() | |
sendBadGateWay(oldestRequest.res) | |
} | |
queue.push({req, res, retryAttemptsLeft: retryAttemptsLeft - 1}) | |
const delayMultiplier = (MAX_ATTEMPS + 1) - retryAttemptsLeft | |
scheduleRetry(queue, delayMultiplier) | |
} | |
else { | |
sendBadGateWay(res) | |
} | |
} | |
function sendBadGateWay(res) { | |
res.statusCode = 502 | |
res.statusMessage = 'Bad Gateway' | |
res.end(` | |
<body style="text-align: center; padding: 10px;"> | |
<h1>502 Server Error</h1> | |
<hr/> | |
<p> | |
The server encountered a temporary error and could not complete your request. | |
Please try again later and if the problem persists then contact support and explain in detail how and when | |
the error occurred. | |
</p> | |
<p>Thank you for your kind understanding.</p> | |
</body> | |
`) | |
} | |
server.on('upgrade', async function(req, socket, head) { | |
// console.log('socket connection') | |
serversList = await redisClient.lrangeAsync('server-registry', 0, 100) | |
if(serversList.length > 0){ | |
serverIndex = (serverIndex + 1) % serversList.length | |
const target = serversList[serverIndex] | |
// console.log('balancing request to: ', target); | |
proxy.ws(req, socket, head, {target}); | |
} | |
}) | |
proxy.on('error', function (err, req, res) { | |
// This will force a registry check on the next request | |
lastRegistryCheck -= REGISTRY_CHECK_TIME | |
console.error('Error') | |
console.error(err) | |
if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft === MAX_ATTEMPS){ | |
handleRequest(req, res, MAX_ATTEMPS - 1) | |
} | |
else if(err.code === 'ECONNREFUSED' && req.retryAttemptsLeft > 0){ | |
queueRequest(req, res) | |
} | |
else | |
sendBadGateWay(res) | |
}) | |
function scheduleRetry(queue, delayMultiplier) { | |
// console.log(`scheduleRetry`) | |
setTimeout(function() { | |
if(queue.length > 0){ | |
const {req, res, retryAttemptsLeft} = queue.shift() | |
if(!res.socket.destroyed){ | |
// console.log('retryAttemptsLeft', retryAttemptsLeft) | |
handleRequest(req, res, retryAttemptsLeft) | |
} | |
else { | |
// CONNECTION RESET BY CLIENT | |
res.end() | |
} | |
} | |
}, 200 * delayMultiplier) | |
} | |
async function validateRegistryAsync(serversList) { | |
const updatedServerList = [] | |
for (let server of serversList) { | |
const isAlive = await testEndPoint(server) | |
if(!isAlive){ | |
redisClient.lremAsync('server-registry', 0, server) | |
// console.log("REMOVED SERVER FROM REGISTRY") | |
} | |
else | |
updatedServerList.push(server) | |
} | |
return updatedServerList | |
} | |
async function testEndPoint(url) { | |
return new Promise((resolve)=> { | |
try { | |
http.get(url, ()=> { | |
resolve(true) | |
}).on('error', (err) => { | |
if(err.code === 'ECONNREFUSED') | |
resolve(false) | |
else | |
resolve(true) | |
}); | |
} catch(err){ | |
resolve(false) | |
} | |
}) | |
} | |
server.listen(port, ()=> { | |
console.log(`Load balancer listening on port ${port}`) | |
}) | |
})() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
redisClient.rpush.apply(redisClient, ['server-registry', serverUrl])
(Make sure is not already there or use a set instead)redisClient.lrem('server-registry', 0, serverUrl)