Skip to content

Instantly share code, notes, and snippets.

@FLYBYME
Created November 30, 2011 23:44
Show Gist options
  • Select an option

  • Save FLYBYME/1411962 to your computer and use it in GitHub Desktop.

Select an option

Save FLYBYME/1411962 to your computer and use it in GitHub Desktop.
var utils = require('./util')
var RpcModule = require('./rpc').RpcModule
var events = require('events');
var util = require('util');
var net = require('net');
var crypto = require('crypto');
/***
*
*
*/
var Peer = function(socket, server) {
events.EventEmitter.call(this);
/**
* Pass socket errors on up the chain.
* @private
*/
function onError(error) {
throw error
}
/**
* Handle our incoming data
* @private
*/
function onData(data) {
data = data.toString();
if(data.indexOf('\n') > -1) {
var message = this.buffer.join('');
data = data.split('\n');
message += data.shift();
var decipher = crypto.createDecipher(this.algorithm, this.key);
var decrypted = decipher.update(message, 'hex', 'utf8') + decipher['final']('utf8');
this.buffer = [];
this.emit('recv', decrypted);
data = data.join('\n');
if(data.length) {
onData.bind(this)(data);
}
} else {
this.buffer.push(data);
}
}
/**
* If end is received, then destroy the socket
*/
function onEnd() {
}
this.id = server.id;
this.destroyed = false;
this.socket = socket;
this.server = server;
this.knowsList = {};
this.algorithm
this.key
this.buffer = [];
socket.on('data', onData.bind(this));
socket.on('error', onError.bind(this));
socket.on('end', onEnd.bind(this));
this.on('recv', this.recv);
}
/**
* Inherits from EventEmitter
*/
util.inherits(Peer, events.EventEmitter);
/**
* Sends a message across the socket
*
* @param {Object} data The data to send over the socket
*/
Peer.prototype.send = function(data) {
var cipher = crypto.createCipher(this.algorithm, this.key);
var encrypted = cipher.update(JSON.stringify(data), 'utf8', 'hex') + cipher['final']('hex') + '\n';
this.socket.writable ? this.socket.write(encrypted) : null;
};
Peer.prototype.setKey = function(key) {
this.key = key;
}
Peer.prototype.setAlgorithm = function(algorithm) {
this.algorithm = algorithm;
}
/**
* Receives a message
*
* @param {String} message The data to be received
*/
Peer.prototype.recv = function(message) {
var data;
try {
data = JSON.parse(message);
} catch(e) {
util.error('Could not parse message: ' + message);
}
if(data && data.type) {
switch(data.type) {
case 'event':
//this.process(data);
break;
//the standard events
case 'discovery':
this.discover(data);
break;
//gets information about a peer
case 'rpc':
this.rpc.requestEvent(data.data);
break;
//connects to a new peer
}
}
};
Peer.prototype.knows = function(peer) {
this.knowsList[peer.id] = peer;
}
Peer.prototype.doesNotKnows = function(peer) {
delete this.knowsList[peer.id];
}
/**
* Discovers information about the peer
*
* @param {Object} data Incoming data from another peer
*/
Peer.prototype.discover = function(data) {
//if there is data, then this is a discovery response
if(data) {
this.id = data.id;
this.port = data.address.port;
this.host = this.socket.remoteAddress;
this.emit('discovered');
} else {
this.port = this.socket.remotePort;
this.host = this.socket.remoteAddress;
this.send({
type : 'discovery',
id : this.server.id,
address : this.server.address()
});
}
};
/**
* Destroys the socket connection
*/
Peer.prototype.destroy = function() {
util.log('Received destroy request for peer: ' + this.id);
this.socket.destroy();
this.destroyed = true;
this.emit('end');
};
/****
*
*/
var Network = module.exports = function() {
events.EventEmitter.call(this)
//
this.peers = {};
this.superPeers = {};
this.notPeers = {};
//
this.algorithm
this.key
this.stack = {};
//
this.server = null;
this.isMaster = true;
this.isMasterEligible = true;
this.isMasterWeight = 0;
this.isMasterMax = 1;
}
/**
* Inherits from EventEmitter
*/
util.inherits(Network, events.EventEmitter);
Network.prototype.setEligible = function(e) {
this.isMasterEligible = !!e;
}
Network.prototype.setWeight = function(w) {
return this.isMasterWeight = w;
}
/***
* @private
*/
Network.prototype.addPeer = function(socket) {
var peer = new Peer(socket, this.server);
//
peer.setKey(this.key);
peer.setAlgorithm(this.algorithm);
//
socket.setNoDelay();
peer.discover();
/**
* Once the discovery is finished, we can add it to the peers
*/
function onDiscovered() {
console.info('Peer with id:' + peer.id + ' discovered');
var rpc = new RpcModule(peer)
/**
*
*/
var keys = Object.keys(this.stack);
for(var i = keys.length - 1; i >= 0; i--) {
var name = keys[i];
rpc.expose(name, this.stack[name]);
};
/**
*
*/
var self = this;
rpc.invoke('network.ready', [Date.now()], function(err, result) {
if(err) {
throw err.message;
}
self.peers[peer.id] = peer;
self.emit('peer', peer, rpc);
});
this.loadCoreModules(peer, rpc)
};
/**
* Got a new peer
*/
function onNewMessagePeer(data) {
this.connectAsPeer(data.port, data.host);
};
/**
* If the socket suddenyl ends, then remove it immediately
*/
function onEnd() {
console.info('Removing peer with id:' + peer.id + ' due to disconnect');
delete this.peers[peer.id];
this.invoke('network.peers.doesNotKnow', [peer.id], function(data) {
})
};
peer.on('end', onEnd.bind(this));
peer.once('discovered', onDiscovered.bind(this));
//peer.on('newPeer', onNewMessagePeer.bind(this));
};
Network.prototype.testPeer = function(id, port, host) {
return !!~Object.keys(this.peers).indexOf(id);
};
/**
* Sends current servent peers information about the new peer
*
* @param {Peer} current The peer to send the information to
*/
Network.prototype.sendPeerInfo = function(current) {
var peers = this.peers;
var currentId = current.id;
var ourId = this.id;
var callBack = function(err, result) {
if(err || !result.pass) {
console.log('Peer failed ready status. Peer ID: ' + peer.id)
peer.destroy();
}
};
Object.keys(peers).forEach(function(id, index) {
var peer = peers[id];
if(!(peer.id === ourId || peer.id === currentId)) {
peer.rpc.invoke('network.peers.knows', [currentId, current.port, current.host], callBack);
}
})
};
/***
*
*
*
*/
Network.prototype.loadCoreModules = function(peer, rpc) {
var self = this;
/**
*
*/
rpc.expose('network', {
ready : function(timeReady) {
self.sendPeerInfo(peer);
this.send('pass', true);
},
query : function(id) {
if(self.testPeer(id)) {
var p = self.peers[id];
this.set('id', id);
this.set('port', p.port);
this.set('host', p.host);
this.send('pass', true);
} else {
this.send('pass', false);
}
},
peers : {
'new' : function(id, port, host) {
self.connectAsPeer(port, host);
this.send('pass', true);
},
list : function(id, port, host) {
this.set('peers', Object.keys(self.peers));
this.send('pass', true);
},
knows : function(id, port, host) {
peer.knows({
id : id,
port : port,
host : host
});
this.send('pass', true);
},
doesNotKnow : function(id, port, host) {
peer.doesNotKnow(id);
this.send('pass', true);
}
},
master : {
isSuper : function() {
this.set('isMaster', self.isMaster)
this.set('isMasterEligible', self.isMasterEligible);
this.set('isMasterWeight', self.isMasterWeight);
this.set('id', self.server.id);
this.send('pass', true);
},
demote : function() {
this.set('id', self.server.id);
this.send('pass', true);
},
demoted : function() {
this.set('id', self.server.id);
this.send('pass', true);
},
promote : function() {
}
}
});
}
/***
*
*
*
*/
Network.prototype.addSuperPeer = function(id) {
if(this.peers.hasOwnProperty(id)) {
var peer = this.peers[id];
this.superPeer[id] = peer;
}
};
Network.prototype.removeSuperPeer = function(id) {
if(this.superPeer.hasOwnProperty(id)) {
delete this.superPeer[id];
}
};
Network.prototype.superPeerAdd = function(superID, id, port, host) {
if(this.peers.hasOwnProperty(id)) {
var peer = this.peers[id];
} else if(this.notPeers.hasOwnProperty(id)) {
return;
this.notPeers[id] = {
superID : superID,
id : id,
port : port,
host : host
};
} else {
this.notPeers[id] = {
superID : superID,
id : id,
port : port,
host : host
};
}
};
Network.prototype.connectAsPeer = function(port, host) {
var ids = Object.keys(this.peers);
var peer;
for(var i = ids.length - 1; i >= 0; i--) {
peer = this.peers[ids[i]];
if(peer.host === host && peer.port === port) {
//console.warn('Already connected to peer on port:' + port + ' with host:' + host + '. Ignoring.');
return;
}
};
var socket = net.connect(port, host);
/**
* When we connect, register the socket as a peer
*/
function onConnect() {
console.info('Peer connected on port:' + port + ' with host:' + host);
this.addPeer(socket);
}
socket.on('connect', onConnect.bind(this));
};
/**
*
*
*
*/
Network.prototype.createServer = function() {
var server = net.Server();
/**
* Listens for new connections and adds a peer for each one
*/
function onConnection(socket) {
console.info('Peer connection request received');
this.addPeer(socket);
}
/**
* Print some info once we are listening
*/
function onListen() {
console.info('Servent listening on: ' + JSON.stringify(server.address()));
}
server.id = utils.uid();
server.on('connection', onConnection.bind(this));
server.on('listening', onListen.bind(this));
return this.server = server;
}
/**
*
*
*
*/
Network.prototype.run = function(peerList) {
var messageChan = this.createServer();
var self = this;
var port = this.port = 45550;
messageChan.on('error', function(err) {
if(err.code === 'EADDRINUSE') {
messageChan.listen(0);
if(!peerList) {
self.connectAsPeer(port);
}
}
})
messageChan.listen(port);
if(peerList) {
if(Array.isArray(peerList)) {
for(var i = peerList.length - 1; i >= 0; i--) {
this.connectAsPeer(peerList[i].port, peerList[i].host);
};
} else {
this.connectAsPeer(peerList.port, peerList.host);
}
}
};
Network.prototype.use = function(module) {
var name = module.name;
var handler = module.handler;
this.stack[name] = handler;
};
Network.prototype.invoke = function(method, params, callBack) {
var data = [];
var peers = this.peers;
var ids = Object.keys(peers);
var length = ids.length;
var count = 0;
var allCallBack = function(err, result) {
data.push({
err : err,
result : result
});
if(++count === length) {
callBack(data);
}
};
var self = this;
ids.forEach(function(id) {
var peer = self.peers[id];
peer.rpc.invoke(method, params, allCallBack);
});
};
Network.prototype.setKey = function(key) {
this.key = key;
}
Network.prototype.setAlgorithm = function(algorithm) {
this.algorithm = algorithm;
}
/**
*
*
*
*
*
*
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment