Skip to content

Instantly share code, notes, and snippets.

@FLYBYME
Created November 16, 2011 23:15
Show Gist options
  • Select an option

  • Save FLYBYME/1371824 to your computer and use it in GitHub Desktop.

Select an option

Save FLYBYME/1371824 to your computer and use it in GitHub Desktop.
var utils = require('../lib/util')
var RpcModule = require('../lib/rpc').RpcModule
var events = require('events');
var util = require('util');
var net = require('net');
/***
*
*
*/
var Peer = function(socket, server) {
events.EventEmitter.call(this);
/**
* Pass socket errors on up the chain.
* @private
*/
function onError(error) {
}
/**
* Handle our incoming data
* @private
*/
function onData(data) {
data = data.toString();
if(data.indexOf('\n') > -1) {
var message = this.buffer.join('');
data = data.split('\n');
message += data.shift();
this.buffer = [];
this.emit('recv', message);
data = data.join('\n');
if(data.length) {
onData.bind(this)(data);
}
} else {
this.buffer.push(data);
}
}
/**
* If end is received, then destroy the socket
*/
function onEnd() {
}
this.id = server.id;
this.destroyed = false;
this.socket = socket;
this.server = server;
this.buffer = [];
socket.on('data', onData.bind(this));
socket.on('error', onError.bind(this));
socket.on('end', onEnd.bind(this));
this.on('recv', this.recv);
}
/**
* Inherits from EventEmitter
*/
util.inherits(Peer, events.EventEmitter);
/**
* Sends a message across the socket
*
* @param {Object} data The data to send over the socket
*/
Peer.prototype.send = function(data) {
var message = JSON.stringify(data) + '\n';
this.socket.write(message);
};
/**
* Receives a message
*
* @param {String} message The data to be received
*/
Peer.prototype.recv = function(message) {
var data;
try {
data = JSON.parse(message);
} catch(e) {
util.error('Could not parse message: ' + message);
}
if(data && data.type) {
switch(data.type) {
case 'event':
//this.process(data);
break;
//the standard events
case 'discovery':
this.discover(data);
break;
//gets information about a peer
case 'rpc':
this.rpc.requestEvent(data.data);
break;
//connects to a new peer
}
}
};
/**
* Discovers information about the peer
*
* @param {Object} data Incoming data from another peer
*/
Peer.prototype.discover = function(data) {
//if there is data, then this is a discovery response
if(data) {
this.id = data.id;
this.port = data.address.port;
this.host = this.socket.remoteAddress;
this.emit('discovered');
} else {
this.port = this.socket.remotePort;
this.host = this.socket.remoteAddress;
this.send({
type : 'discovery',
id : this.server.id,
address : this.server.address()
});
}
};
/**
* Destroys the socket connection
*/
Peer.prototype.destroy = function() {
util.log('Received destroy request for peer: ' + this.id);
this.socket.destroy();
this.destroyed = true;
this.emit('end');
};
/****
*
*/
var Network = function() {
events.EventEmitter.call(this)
//
this.peers = {};
this.superPeers = {};
//
this.stack = {};
//
this.MessageChan
this.DataChan
this.SuperChan
}
/**
* Inherits from EventEmitter
*/
util.inherits(Network, events.EventEmitter);
/***
* @private
*/
Network.prototype.addPeer = function(socket) {
var peer = new Peer(socket, this.MessageChan)
socket.setNoDelay();
peer.discover();
/**
* Once the discovery is finished, we can add it to the peers
*/
function onDiscovered() {
console.info('Peer with id:' + peer.id + ' discovered');
this.peers[peer.id] = peer;
var rpc = new RpcModule(peer)
var self = this;
var networkModule = {
newPeer : function(id, port, host) {
self.connectAsPeer(port, host)
this.send('pass', true);
},
ready : function(timeReady) {
self.sendPeerInfo(peer);
this.send('pass', true);
},
peers : function() {
this.set('peers', Object.keys(self.peers))
this.send('pass', true);
}
};
/**
*
*/
rpc.expose('network', networkModule);
/**
*
*/
var keys = Object.keys(this.stack);
for(var i = keys.length - 1; i >= 0; i--) {
var name = keys[i]
rpc.expose(name, self.stack[name])
};
/**
*
*/
rpc.invoke('network.ready', [Date.now()], function(err, result) {
if(err)
throw err.message;
console.log(result);
self.emit('peer', peer)
});
};
/**
* Got a new peer
*/
function onNewMessagePeer(data) {
this.connectAsPeer(data.port, data.host)
}
/**
* If the socket suddenyl ends, then remove it immediately
*/
function onEnd() {
util.info('Removing peer with id:' + peer.id + ' due to disconnect');
delete this.peers[peer.id];
}
peer.on('end', onEnd.bind(this));
peer.once('discovered', onDiscovered.bind(this));
//peer.on('newPeer', onNewMessagePeer.bind(this));
};
Network.prototype.connectAsPeer = function(port, host) {
var ids = Object.keys(this.peers);
var peer;
for(var i = ids.length - 1; i >= 0; i--) {
peer = this.peers[ids[i]];
if(peer.host === host && peer.port === port) {
console.warn('Already connected to peer on port:' + port + ' with host:' + host + '. Ignoring.');
return;
}
};
var socket = net.connect(port, host);
/**
* When we connect, register the socket as a peer
*/
function onConnect() {
console.info('Peer connected on port:' + port + ' with host:' + host);
this.addPeer(socket);
}
socket.on('connect', onConnect.bind(this));
};
/**
*
*
*
*/
Network.prototype.createServer = function() {
var server = net.Server();
/**
* Listens for new connections and adds a peer for each one
*/
function onConnection(socket) {
console.info('Peer connection request received');
this.addPeer(socket);
}
/**
* Print some info once we are listening
*/
function onListen() {
console.info('Servent listening on: ' + JSON.stringify(server.address()));
}
server.id = utils.uid();
server.on('connection', onConnection.bind(this));
server.on('listening', onListen.bind(this));
return this.MessageChan = server;
}
/**
* Sends current servent peers information about the new peer
*
* @param {Peer} current The peer to send the information to
*/
Network.prototype.sendPeerInfo = function(current) {
var peers = this.peers;
var currentId = current.id;
var ourId = this.id;
Object.keys(peers).forEach(function(id, index) {
var peer = peers[id];
if(!(peer.id === ourId || peer.id === currentId)) {
peer.rpc.invoke('network.newPeer', [currentId, current.port, current.host], function(err, result) {
if(err || !result.pass) {
peer.destroy();
}
})
}
});
};
/**
*
*
*
*/
Network.prototype.run = function(peerList) {
var messageChan = netowrk.createServer();
var port = 45550;
messageChan.on('error', function(err) {
if(err.code === 'EADDRINUSE') {
messageChan.listen(0);
netowrk.connectAsPeer(port);
}
})
messageChan.listen(port);
if(peerList) {
if(Array.isArray(peerList)) {
for(var i = peerList.length - 1; i >= 0; i--) {
netowrk.connectAsPeer(peerList[i].port, peerList[i].host);
};
} else {
netowrk.connectAsPeer(peerList.port, peerList.host);
}
}
}
Network.prototype.use = function(module) {
var name = module.name;
var handler = module.handler;
this.stack[name] = handler;
}
/**
*
*
*
*
*
*
*/
console.time('Boot time test')
var netowrk = new Network();
var testModule = function() {
var privateA = 'a';
var privateB = 'b';
return {
name : 'test', //Name the module
handler : {//can be an object or function
test : function() {
this.set('test', true)
this.set('a', privateA)
this.set('b', privateB).send()
}
}
}
}
netowrk.use(testModule());
netowrk.run();
/*
netowrk.run([{
port : 89898,
host : 'a.wiyc.info'
}, {
port : 89898,
host : 'b.wiyc.info'
}]);
netowrk.run({
port : 89898,
host : 'wiyc.info'
});
*/
console.timeEnd('Boot time test')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment