Created
November 30, 2011 23:44
-
-
Save FLYBYME/1411962 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| var utils = require('./util') | |
| var RpcModule = require('./rpc').RpcModule | |
| var events = require('events'); | |
| var util = require('util'); | |
| var net = require('net'); | |
| var crypto = require('crypto'); | |
| /*** | |
| * | |
| * | |
| */ | |
| var Peer = function(socket, server) { | |
| events.EventEmitter.call(this); | |
| /** | |
| * Pass socket errors on up the chain. | |
| * @private | |
| */ | |
| function onError(error) { | |
| throw error | |
| } | |
| /** | |
| * Handle our incoming data | |
| * @private | |
| */ | |
| function onData(data) { | |
| data = data.toString(); | |
| if(data.indexOf('\n') > -1) { | |
| var message = this.buffer.join(''); | |
| data = data.split('\n'); | |
| message += data.shift(); | |
| var decipher = crypto.createDecipher(this.algorithm, this.key); | |
| var decrypted = decipher.update(message, 'hex', 'utf8') + decipher['final']('utf8'); | |
| this.buffer = []; | |
| this.emit('recv', decrypted); | |
| data = data.join('\n'); | |
| if(data.length) { | |
| onData.bind(this)(data); | |
| } | |
| } else { | |
| this.buffer.push(data); | |
| } | |
| } | |
| /** | |
| * If end is received, then destroy the socket | |
| */ | |
| function onEnd() { | |
| } | |
| this.id = server.id; | |
| this.destroyed = false; | |
| this.socket = socket; | |
| this.server = server; | |
| this.knowsList = {}; | |
| this.algorithm | |
| this.key | |
| this.buffer = []; | |
| socket.on('data', onData.bind(this)); | |
| socket.on('error', onError.bind(this)); | |
| socket.on('end', onEnd.bind(this)); | |
| this.on('recv', this.recv); | |
| } | |
| /** | |
| * Inherits from EventEmitter | |
| */ | |
| util.inherits(Peer, events.EventEmitter); | |
| /** | |
| * Sends a message across the socket | |
| * | |
| * @param {Object} data The data to send over the socket | |
| */ | |
| Peer.prototype.send = function(data) { | |
| var cipher = crypto.createCipher(this.algorithm, this.key); | |
| var encrypted = cipher.update(JSON.stringify(data), 'utf8', 'hex') + cipher['final']('hex') + '\n'; | |
| this.socket.writable ? this.socket.write(encrypted) : null; | |
| }; | |
| Peer.prototype.setKey = function(key) { | |
| this.key = key; | |
| } | |
| Peer.prototype.setAlgorithm = function(algorithm) { | |
| this.algorithm = algorithm; | |
| } | |
| /** | |
| * Receives a message | |
| * | |
| * @param {String} message The data to be received | |
| */ | |
| Peer.prototype.recv = function(message) { | |
| var data; | |
| try { | |
| data = JSON.parse(message); | |
| } catch(e) { | |
| util.error('Could not parse message: ' + message); | |
| } | |
| if(data && data.type) { | |
| switch(data.type) { | |
| case 'event': | |
| //this.process(data); | |
| break; | |
| //the standard events | |
| case 'discovery': | |
| this.discover(data); | |
| break; | |
| //gets information about a peer | |
| case 'rpc': | |
| this.rpc.requestEvent(data.data); | |
| break; | |
| //connects to a new peer | |
| } | |
| } | |
| }; | |
| Peer.prototype.knows = function(peer) { | |
| this.knowsList[peer.id] = peer; | |
| } | |
| Peer.prototype.doesNotKnows = function(peer) { | |
| delete this.knowsList[peer.id]; | |
| } | |
| /** | |
| * Discovers information about the peer | |
| * | |
| * @param {Object} data Incoming data from another peer | |
| */ | |
| Peer.prototype.discover = function(data) { | |
| //if there is data, then this is a discovery response | |
| if(data) { | |
| this.id = data.id; | |
| this.port = data.address.port; | |
| this.host = this.socket.remoteAddress; | |
| this.emit('discovered'); | |
| } else { | |
| this.port = this.socket.remotePort; | |
| this.host = this.socket.remoteAddress; | |
| this.send({ | |
| type : 'discovery', | |
| id : this.server.id, | |
| address : this.server.address() | |
| }); | |
| } | |
| }; | |
| /** | |
| * Destroys the socket connection | |
| */ | |
| Peer.prototype.destroy = function() { | |
| util.log('Received destroy request for peer: ' + this.id); | |
| this.socket.destroy(); | |
| this.destroyed = true; | |
| this.emit('end'); | |
| }; | |
| /**** | |
| * | |
| */ | |
| var Network = module.exports = function() { | |
| events.EventEmitter.call(this) | |
| // | |
| this.peers = {}; | |
| this.superPeers = {}; | |
| this.notPeers = {}; | |
| // | |
| this.algorithm | |
| this.key | |
| this.stack = {}; | |
| // | |
| this.server = null; | |
| this.isMaster = true; | |
| this.isMasterEligible = true; | |
| this.isMasterWeight = 0; | |
| this.isMasterMax = 1; | |
| } | |
| /** | |
| * Inherits from EventEmitter | |
| */ | |
| util.inherits(Network, events.EventEmitter); | |
| Network.prototype.setEligible = function(e) { | |
| this.isMasterEligible = !!e; | |
| } | |
| Network.prototype.setWeight = function(w) { | |
| return this.isMasterWeight = w; | |
| } | |
| /*** | |
| * @private | |
| */ | |
| Network.prototype.addPeer = function(socket) { | |
| var peer = new Peer(socket, this.server); | |
| // | |
| peer.setKey(this.key); | |
| peer.setAlgorithm(this.algorithm); | |
| // | |
| socket.setNoDelay(); | |
| peer.discover(); | |
| /** | |
| * Once the discovery is finished, we can add it to the peers | |
| */ | |
| function onDiscovered() { | |
| console.info('Peer with id:' + peer.id + ' discovered'); | |
| var rpc = new RpcModule(peer) | |
| /** | |
| * | |
| */ | |
| var keys = Object.keys(this.stack); | |
| for(var i = keys.length - 1; i >= 0; i--) { | |
| var name = keys[i]; | |
| rpc.expose(name, this.stack[name]); | |
| }; | |
| /** | |
| * | |
| */ | |
| var self = this; | |
| rpc.invoke('network.ready', [Date.now()], function(err, result) { | |
| if(err) { | |
| throw err.message; | |
| } | |
| self.peers[peer.id] = peer; | |
| self.emit('peer', peer, rpc); | |
| }); | |
| this.loadCoreModules(peer, rpc) | |
| }; | |
| /** | |
| * Got a new peer | |
| */ | |
| function onNewMessagePeer(data) { | |
| this.connectAsPeer(data.port, data.host); | |
| }; | |
| /** | |
| * If the socket suddenyl ends, then remove it immediately | |
| */ | |
| function onEnd() { | |
| console.info('Removing peer with id:' + peer.id + ' due to disconnect'); | |
| delete this.peers[peer.id]; | |
| this.invoke('network.peers.doesNotKnow', [peer.id], function(data) { | |
| }) | |
| }; | |
| peer.on('end', onEnd.bind(this)); | |
| peer.once('discovered', onDiscovered.bind(this)); | |
| //peer.on('newPeer', onNewMessagePeer.bind(this)); | |
| }; | |
| Network.prototype.testPeer = function(id, port, host) { | |
| return !!~Object.keys(this.peers).indexOf(id); | |
| }; | |
| /** | |
| * Sends current servent peers information about the new peer | |
| * | |
| * @param {Peer} current The peer to send the information to | |
| */ | |
| Network.prototype.sendPeerInfo = function(current) { | |
| var peers = this.peers; | |
| var currentId = current.id; | |
| var ourId = this.id; | |
| var callBack = function(err, result) { | |
| if(err || !result.pass) { | |
| console.log('Peer failed ready status. Peer ID: ' + peer.id) | |
| peer.destroy(); | |
| } | |
| }; | |
| Object.keys(peers).forEach(function(id, index) { | |
| var peer = peers[id]; | |
| if(!(peer.id === ourId || peer.id === currentId)) { | |
| peer.rpc.invoke('network.peers.knows', [currentId, current.port, current.host], callBack); | |
| } | |
| }) | |
| }; | |
| /*** | |
| * | |
| * | |
| * | |
| */ | |
| Network.prototype.loadCoreModules = function(peer, rpc) { | |
| var self = this; | |
| /** | |
| * | |
| */ | |
| rpc.expose('network', { | |
| ready : function(timeReady) { | |
| self.sendPeerInfo(peer); | |
| this.send('pass', true); | |
| }, | |
| query : function(id) { | |
| if(self.testPeer(id)) { | |
| var p = self.peers[id]; | |
| this.set('id', id); | |
| this.set('port', p.port); | |
| this.set('host', p.host); | |
| this.send('pass', true); | |
| } else { | |
| this.send('pass', false); | |
| } | |
| }, | |
| peers : { | |
| 'new' : function(id, port, host) { | |
| self.connectAsPeer(port, host); | |
| this.send('pass', true); | |
| }, | |
| list : function(id, port, host) { | |
| this.set('peers', Object.keys(self.peers)); | |
| this.send('pass', true); | |
| }, | |
| knows : function(id, port, host) { | |
| peer.knows({ | |
| id : id, | |
| port : port, | |
| host : host | |
| }); | |
| this.send('pass', true); | |
| }, | |
| doesNotKnow : function(id, port, host) { | |
| peer.doesNotKnow(id); | |
| this.send('pass', true); | |
| } | |
| }, | |
| master : { | |
| isSuper : function() { | |
| this.set('isMaster', self.isMaster) | |
| this.set('isMasterEligible', self.isMasterEligible); | |
| this.set('isMasterWeight', self.isMasterWeight); | |
| this.set('id', self.server.id); | |
| this.send('pass', true); | |
| }, | |
| demote : function() { | |
| this.set('id', self.server.id); | |
| this.send('pass', true); | |
| }, | |
| demoted : function() { | |
| this.set('id', self.server.id); | |
| this.send('pass', true); | |
| }, | |
| promote : function() { | |
| } | |
| } | |
| }); | |
| } | |
| /*** | |
| * | |
| * | |
| * | |
| */ | |
| Network.prototype.addSuperPeer = function(id) { | |
| if(this.peers.hasOwnProperty(id)) { | |
| var peer = this.peers[id]; | |
| this.superPeer[id] = peer; | |
| } | |
| }; | |
| Network.prototype.removeSuperPeer = function(id) { | |
| if(this.superPeer.hasOwnProperty(id)) { | |
| delete this.superPeer[id]; | |
| } | |
| }; | |
| Network.prototype.superPeerAdd = function(superID, id, port, host) { | |
| if(this.peers.hasOwnProperty(id)) { | |
| var peer = this.peers[id]; | |
| } else if(this.notPeers.hasOwnProperty(id)) { | |
| return; | |
| this.notPeers[id] = { | |
| superID : superID, | |
| id : id, | |
| port : port, | |
| host : host | |
| }; | |
| } else { | |
| this.notPeers[id] = { | |
| superID : superID, | |
| id : id, | |
| port : port, | |
| host : host | |
| }; | |
| } | |
| }; | |
| Network.prototype.connectAsPeer = function(port, host) { | |
| var ids = Object.keys(this.peers); | |
| var peer; | |
| for(var i = ids.length - 1; i >= 0; i--) { | |
| peer = this.peers[ids[i]]; | |
| if(peer.host === host && peer.port === port) { | |
| //console.warn('Already connected to peer on port:' + port + ' with host:' + host + '. Ignoring.'); | |
| return; | |
| } | |
| }; | |
| var socket = net.connect(port, host); | |
| /** | |
| * When we connect, register the socket as a peer | |
| */ | |
| function onConnect() { | |
| console.info('Peer connected on port:' + port + ' with host:' + host); | |
| this.addPeer(socket); | |
| } | |
| socket.on('connect', onConnect.bind(this)); | |
| }; | |
| /** | |
| * | |
| * | |
| * | |
| */ | |
| Network.prototype.createServer = function() { | |
| var server = net.Server(); | |
| /** | |
| * Listens for new connections and adds a peer for each one | |
| */ | |
| function onConnection(socket) { | |
| console.info('Peer connection request received'); | |
| this.addPeer(socket); | |
| } | |
| /** | |
| * Print some info once we are listening | |
| */ | |
| function onListen() { | |
| console.info('Servent listening on: ' + JSON.stringify(server.address())); | |
| } | |
| server.id = utils.uid(); | |
| server.on('connection', onConnection.bind(this)); | |
| server.on('listening', onListen.bind(this)); | |
| return this.server = server; | |
| } | |
| /** | |
| * | |
| * | |
| * | |
| */ | |
| Network.prototype.run = function(peerList) { | |
| var messageChan = this.createServer(); | |
| var self = this; | |
| var port = this.port = 45550; | |
| messageChan.on('error', function(err) { | |
| if(err.code === 'EADDRINUSE') { | |
| messageChan.listen(0); | |
| if(!peerList) { | |
| self.connectAsPeer(port); | |
| } | |
| } | |
| }) | |
| messageChan.listen(port); | |
| if(peerList) { | |
| if(Array.isArray(peerList)) { | |
| for(var i = peerList.length - 1; i >= 0; i--) { | |
| this.connectAsPeer(peerList[i].port, peerList[i].host); | |
| }; | |
| } else { | |
| this.connectAsPeer(peerList.port, peerList.host); | |
| } | |
| } | |
| }; | |
| Network.prototype.use = function(module) { | |
| var name = module.name; | |
| var handler = module.handler; | |
| this.stack[name] = handler; | |
| }; | |
| Network.prototype.invoke = function(method, params, callBack) { | |
| var data = []; | |
| var peers = this.peers; | |
| var ids = Object.keys(peers); | |
| var length = ids.length; | |
| var count = 0; | |
| var allCallBack = function(err, result) { | |
| data.push({ | |
| err : err, | |
| result : result | |
| }); | |
| if(++count === length) { | |
| callBack(data); | |
| } | |
| }; | |
| var self = this; | |
| ids.forEach(function(id) { | |
| var peer = self.peers[id]; | |
| peer.rpc.invoke(method, params, allCallBack); | |
| }); | |
| }; | |
| Network.prototype.setKey = function(key) { | |
| this.key = key; | |
| } | |
| Network.prototype.setAlgorithm = function(algorithm) { | |
| this.algorithm = algorithm; | |
| } | |
| /** | |
| * | |
| * | |
| * | |
| * | |
| * | |
| * | |
| */ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment