Skip to content

Instantly share code, notes, and snippets.

@fabware
Created June 8, 2011 03:23
Show Gist options
  • Save fabware/1013711 to your computer and use it in GitHub Desktop.
Save fabware/1013711 to your computer and use it in GitHub Desktop.
socket.io application and haproxy configuration
var express = require('express');
var redis = require('redis');
const serverType = process.argv[2];
const serverHost = process.argv[3];
const serverPort = parseInt(process.argv[4]);
const redisPort = 6379;
const redisHost = '127.0.0.1';
var rclient = new redis.createClient(redisPort, redisHost);
function writeUserMessageHost(userId, host){
rclient.set('MUPH:'+userId, host);
}
function deleteUserMessageHost(userId){
rclient.delete('MUPH:'+userId);
}
function writeChannelMessageHost(channel, host){
rclient.sadd('MCMH:'+channel, host);
}
function ListenerContainer(){
this.listeners = {};
this.listenersCount = 0;
};
ListenerContainer.prototype.addClient = function(client){
this.listeners[client.listenerId] = client;
this.listenersCount += 1;
writeUserMessageHost(client.listenerId, serverHost+':'+serverPort);
};
ListenerContainer.prototype.removeClient = function(client){
delete this.listeners[client.listenerId];
this.listenersCount -= 1;
deleteUserMessageHost(client.listenerId, serverHost+':'+serverPort);
};
ListenerContainer.prototype.getClients = function(listenerId){
var client = this.listeners[listenerId];
if (client != null){
return {listenerId: client};
}else{
return {};
};
};
ListenerContainer.prototype.count = function(listenerId){
return this.listenersCount;
1,17 Top
ListenerContainer.prototype.clientsCount = function(){
// find first listener
var firstListener = null;
for (listenerId in this.listeners){
var firstListener = this.listeners[listenerId].listener;
break;
};
var listenerCount = 0;
if (firstListener){
for(client in firstListener.clients){
listenerCount += 1;
};
};
return listenerCount;
};
function MultiListenerContainer() {
this.listeners = {};
this.listenersCount = {'__total': 0};
};
MultiListenerContainer.prototype.addClient = function(client){
// console.log('add client to channel:', client.listenerId, client.sessionId);
if (client.listenerId in this.listeners){
if(!(client.sessionId in this.listeners[client.listenerId])){
this.listeners[client.listenerId][client.sessionId] = client;
this.listenersCount[client.listenerId] += 1;
this.listenersCount['__total'] += 1;
};
}else{
this.listeners[client.listenerId] = {};
this.listeners[client.listenerId][client.sessionId] = client;
this.listenersCount[client.listenerId] = 1;
this.listenersCount['__total'] += 1;
// console.log('init client', client.sessionId);
};
writeChannelMessageHost(client.listenerId, serverHost+':'+serverPort);
};
MultiListenerContainer.prototype.removeClient = function(client){
if (client.listenerId in this.listeners){
if (client.sessionId in this.listeners[client.listenerId]){
delete this.listeners[client.listenerId][client.sessionId];
this.listenersCount[client.listenerId] -= 1;
this.listenersCount['__total'] -= 1;
}
}
};
MultiListenerContainer.prototype.getClients = function(listenerId){
if (listenerId in this.listeners){
var clients = this.listeners[listenerId];
if (clients){
return clients;
}else{
return {}
};
}
};
MultiListenerContainer.prototype.count = function(listenerId){
if (listenerId){
if (listenerId in this.listenersCount){
return this.listenersCount[listenerId];
}else{
return 0;
}
}else{
return this.listenersCount;
};
};
MultiListenerContainer.prototype.clientsCount = function(){
// find first listener
var firstListener = null;
for (listenerId in this.listeners){
for (sessionId in this.listeners[listenerId]){
var firstListener = this.listeners[listenerId][sessionId].listener;
break;
};
};
var listenerCount = 0;
if (firstListener){
for(client in firstListener.clients){
listenerCount += 1;
};
};
return listenerCount;
};
var userContainer = new ListenerContainer();
var channelContainer = new MultiListenerContainer();
var server = module.exports = express.createServer();
server.get('/post/:msg_target/:msg_target_id/:msg', function(req, res){
var sentCounter = 0;
if (req.params.msg_target == 'channel'){
var container = channelContainer;
}else if(req.params.msg_target == 'user')
{
var container = userContainer;
}
var clients = container.getClients(req.params.msg_target_id);
for(listenerId in clients){
var client = clients[listenerId];
var msgBuf = new Buffer(req.params.msg, 'base64');
var msgStr = msgBuf.toString('utf-8');
client.send(msgStr);
sentCounter += 1;
}
res.send(''+sentCounter);
});
server.get('/stats/count', function(req, res){
var result = {};
result['user'] = userContainer.count();
result['channel'] = channelContainer.count();
result['user_clients'] = userContainer.clientsCount();
result['channel_clients'] = channelContainer.clientsCount();
res.send(JSON.stringify(result));
});
server.get('/debug/check/session/:session_id', function(req, res){
var found = 'not found';
for(userId in userContainer.listeners){
var client = userContainer.listeners[userId];
if (client.sessionId == req.params.session_id){
found = 'found in user: '+userId;
}
};
for(channel in channelContainer.listeners){
for(sessionId in channelContainer.listeners[channel]){
if(sessionId == req.params.session_id){
found = 'found in channel: '+channel;
}
}
}
res.send(found);
});
function getRequestExtra(url){
return url.split('/')[2];
};
const io = require('socket.io');
function serveChannelMessage(){
console.log('starting channel server...');
const socket = io.listen(server, {'resource': 'channelmsg',
'flashPolicyServer': false,
'transports': ['flashsocket']});
socket.on('connection', function(client){
// console.dir(channelContainer);
var extra = getRequestExtra(client.request.url);
var extraParts = extra.split('_');
client.listenerId = extraParts[0];
var onlineToken = extraParts[1];
channelContainer.addClient(client);
client.on('message', function(msg){
});
client.on('disconnect', function(){
channelContainer.removeClient(this);
});
});
};
function serveUserMessage(){
const socket = io.listen(server, {'resource': 'usermsg',
'flashPolicyServer': false,
'transports': ['flashsocket']});
socket.on('connection', function(client){
client.listenerId = getRequestExtra(client.request.url);
userContainer.addClient(client);
client.on('message', function(msg){
});
client.on('disconnect', function(){
userContainer.removeClient(this);
});
});
};
if(serverType == 'user'){
serveUserMessage();
}else if(serverType == 'channel'){
serveChannelMessage();
}
server.listen(serverPort, serverHost);
global
# daemon
maxconn 200000
pidfile /var/run/haproxy.pid
user nobody
group nobody
# debug
defaults
mode http
# option http-server-close
# option http-pretend-keepalive
# option httplog
stats uri /haproxy_stats
frontend flashpolicy 0.0.0.0:843
mode tcp
maxconn 20000
timeout client 86400000
default_backend flashpolicy_servers
backend flashpolicy_servers
mode tcp
balance roundrobin
timeout server 86400000
timeout connect 86400000
server fp1 10.0.0.55:10000 weight 1 maxconn 4000 check
server fp2 10.0.0.55:10001 weight 1 maxconn 4000 check
server fp3 10.0.0.55:10002 weight 1 maxconn 4000 check
server fp4 10.0.0.55:10003 weight 1 maxconn 4000 check
server fp5 10.0.0.55:10004 weight 1 maxconn 4000 check
frontend all 0.0.0.0:30000
maxconn 200000
timeout client 86400000
acl is_usermsg_stream path_dir usermsg
use_backend usermsg_stream_servers if is_usermsg_stream
acl is_channelmsg_stream path_dir channelmsg
use_backend channelmsg_stream_servers if is_channelmsg_stream
backend usermsg_stream_servers
balance roundrobin
option forwardfor
timeout queue 5000
timeout server 86400000
timeout connect 86400000
server stream20 10.0.0.55:40001 weight 1 maxconn 10000 check
server stream21 10.0.0.55:40002 weight 1 maxconn 10000 check
#server stream22 10.0.0.55:40003 weight 1 maxconn 10000 check
#server stream23 10.0.0.55:40004 weight 1 maxconn 10000 check
#server stream24 10.0.0.55:40005 weight 1 maxconn 10000 check
#server stream25 10.0.0.55:40006 weight 1 maxconn 10000 check
#server stream26 10.0.0.55:40007 weight 1 maxconn 10000 check
#server stream27 10.0.0.55:40008 weight 1 maxconn 10000 check
#server stream28 10.0.0.55:40009 weight 1 maxconn 10000 check
#server stream29 10.0.0.55:40010 weight 1 maxconn 10000 check
backend channelmsg_stream_servers
balance roundrobin
option forwardfor
timeout queue 5000
timeout server 86400000
timeout connect 86400000
server stream30 10.0.0.55:50001 weight 1 maxconn 20000 check
server stream31 10.0.0.55:50002 weight 1 maxconn 20000 check
#server stream32 10.0.0.55:50003 weight 1 maxconn 10000 check
#server stream33 10.0.0.55:50004 weight 1 maxconn 10000 check
#server stream34 10.0.0.55:50005 weight 1 maxconn 10000 check
#server stream35 10.0.0.55:50006 weight 1 maxconn 10000 check
#server stream36 10.0.0.55:50007 weight 1 maxconn 10000 check
#server stream37 10.0.0.55:50008 weight 1 maxconn 10000 check
#server stream38 10.0.0.55:50009 weight 1 maxconn 10000 check
#server stream39 10.0.0.55:50010 weight 1 maxconn 10000 check
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment