Created
November 16, 2011 23:15
-
-
Save FLYBYME/1371824 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| var utils = require('../lib/util') | |
| var RpcModule = require('../lib/rpc').RpcModule | |
| var events = require('events'); | |
| var util = require('util'); | |
| var net = require('net'); | |
| /*** | |
| * | |
| * | |
| */ | |
| var Peer = function(socket, server) { | |
| events.EventEmitter.call(this); | |
| /** | |
| * Pass socket errors on up the chain. | |
| * @private | |
| */ | |
| function onError(error) { | |
| } | |
| /** | |
| * Handle our incoming data | |
| * @private | |
| */ | |
| function onData(data) { | |
| data = data.toString(); | |
| if(data.indexOf('\n') > -1) { | |
| var message = this.buffer.join(''); | |
| data = data.split('\n'); | |
| message += data.shift(); | |
| this.buffer = []; | |
| this.emit('recv', message); | |
| data = data.join('\n'); | |
| if(data.length) { | |
| onData.bind(this)(data); | |
| } | |
| } else { | |
| this.buffer.push(data); | |
| } | |
| } | |
| /** | |
| * If end is received, then destroy the socket | |
| */ | |
| function onEnd() { | |
| } | |
| this.id = server.id; | |
| this.destroyed = false; | |
| this.socket = socket; | |
| this.server = server; | |
| this.buffer = []; | |
| socket.on('data', onData.bind(this)); | |
| socket.on('error', onError.bind(this)); | |
| socket.on('end', onEnd.bind(this)); | |
| this.on('recv', this.recv); | |
| } | |
| /** | |
| * Inherits from EventEmitter | |
| */ | |
| util.inherits(Peer, events.EventEmitter); | |
| /** | |
| * Sends a message across the socket | |
| * | |
| * @param {Object} data The data to send over the socket | |
| */ | |
| Peer.prototype.send = function(data) { | |
| var message = JSON.stringify(data) + '\n'; | |
| this.socket.write(message); | |
| }; | |
| /** | |
| * Receives a message | |
| * | |
| * @param {String} message The data to be received | |
| */ | |
| Peer.prototype.recv = function(message) { | |
| var data; | |
| try { | |
| data = JSON.parse(message); | |
| } catch(e) { | |
| util.error('Could not parse message: ' + message); | |
| } | |
| if(data && data.type) { | |
| switch(data.type) { | |
| case 'event': | |
| //this.process(data); | |
| break; | |
| //the standard events | |
| case 'discovery': | |
| this.discover(data); | |
| break; | |
| //gets information about a peer | |
| case 'rpc': | |
| this.rpc.requestEvent(data.data); | |
| break; | |
| //connects to a new peer | |
| } | |
| } | |
| }; | |
| /** | |
| * Discovers information about the peer | |
| * | |
| * @param {Object} data Incoming data from another peer | |
| */ | |
| Peer.prototype.discover = function(data) { | |
| //if there is data, then this is a discovery response | |
| if(data) { | |
| this.id = data.id; | |
| this.port = data.address.port; | |
| this.host = this.socket.remoteAddress; | |
| this.emit('discovered'); | |
| } else { | |
| this.port = this.socket.remotePort; | |
| this.host = this.socket.remoteAddress; | |
| this.send({ | |
| type : 'discovery', | |
| id : this.server.id, | |
| address : this.server.address() | |
| }); | |
| } | |
| }; | |
| /** | |
| * Destroys the socket connection | |
| */ | |
| Peer.prototype.destroy = function() { | |
| util.log('Received destroy request for peer: ' + this.id); | |
| this.socket.destroy(); | |
| this.destroyed = true; | |
| this.emit('end'); | |
| }; | |
| /**** | |
| * | |
| */ | |
| var Network = function() { | |
| events.EventEmitter.call(this) | |
| // | |
| this.peers = {}; | |
| this.superPeers = {}; | |
| // | |
| this.stack = {}; | |
| // | |
| this.MessageChan | |
| this.DataChan | |
| this.SuperChan | |
| } | |
| /** | |
| * Inherits from EventEmitter | |
| */ | |
| util.inherits(Network, events.EventEmitter); | |
| /*** | |
| * @private | |
| */ | |
| Network.prototype.addPeer = function(socket) { | |
| var peer = new Peer(socket, this.MessageChan) | |
| socket.setNoDelay(); | |
| peer.discover(); | |
| /** | |
| * Once the discovery is finished, we can add it to the peers | |
| */ | |
| function onDiscovered() { | |
| console.info('Peer with id:' + peer.id + ' discovered'); | |
| this.peers[peer.id] = peer; | |
| var rpc = new RpcModule(peer) | |
| var self = this; | |
| var networkModule = { | |
| newPeer : function(id, port, host) { | |
| self.connectAsPeer(port, host) | |
| this.send('pass', true); | |
| }, | |
| ready : function(timeReady) { | |
| self.sendPeerInfo(peer); | |
| this.send('pass', true); | |
| }, | |
| peers : function() { | |
| this.set('peers', Object.keys(self.peers)) | |
| this.send('pass', true); | |
| } | |
| }; | |
| /** | |
| * | |
| */ | |
| rpc.expose('network', networkModule); | |
| /** | |
| * | |
| */ | |
| var keys = Object.keys(this.stack); | |
| for(var i = keys.length - 1; i >= 0; i--) { | |
| var name = keys[i] | |
| rpc.expose(name, self.stack[name]) | |
| }; | |
| /** | |
| * | |
| */ | |
| rpc.invoke('network.ready', [Date.now()], function(err, result) { | |
| if(err) | |
| throw err.message; | |
| console.log(result); | |
| self.emit('peer', peer) | |
| }); | |
| }; | |
| /** | |
| * Got a new peer | |
| */ | |
| function onNewMessagePeer(data) { | |
| this.connectAsPeer(data.port, data.host) | |
| } | |
| /** | |
| * If the socket suddenyl ends, then remove it immediately | |
| */ | |
| function onEnd() { | |
| util.info('Removing peer with id:' + peer.id + ' due to disconnect'); | |
| delete this.peers[peer.id]; | |
| } | |
| peer.on('end', onEnd.bind(this)); | |
| peer.once('discovered', onDiscovered.bind(this)); | |
| //peer.on('newPeer', onNewMessagePeer.bind(this)); | |
| }; | |
| Network.prototype.connectAsPeer = function(port, host) { | |
| var ids = Object.keys(this.peers); | |
| var peer; | |
| for(var i = ids.length - 1; i >= 0; i--) { | |
| peer = this.peers[ids[i]]; | |
| if(peer.host === host && peer.port === port) { | |
| console.warn('Already connected to peer on port:' + port + ' with host:' + host + '. Ignoring.'); | |
| return; | |
| } | |
| }; | |
| var socket = net.connect(port, host); | |
| /** | |
| * When we connect, register the socket as a peer | |
| */ | |
| function onConnect() { | |
| console.info('Peer connected on port:' + port + ' with host:' + host); | |
| this.addPeer(socket); | |
| } | |
| socket.on('connect', onConnect.bind(this)); | |
| }; | |
| /** | |
| * | |
| * | |
| * | |
| */ | |
| Network.prototype.createServer = function() { | |
| var server = net.Server(); | |
| /** | |
| * Listens for new connections and adds a peer for each one | |
| */ | |
| function onConnection(socket) { | |
| console.info('Peer connection request received'); | |
| this.addPeer(socket); | |
| } | |
| /** | |
| * Print some info once we are listening | |
| */ | |
| function onListen() { | |
| console.info('Servent listening on: ' + JSON.stringify(server.address())); | |
| } | |
| server.id = utils.uid(); | |
| server.on('connection', onConnection.bind(this)); | |
| server.on('listening', onListen.bind(this)); | |
| return this.MessageChan = server; | |
| } | |
| /** | |
| * Sends current servent peers information about the new peer | |
| * | |
| * @param {Peer} current The peer to send the information to | |
| */ | |
| Network.prototype.sendPeerInfo = function(current) { | |
| var peers = this.peers; | |
| var currentId = current.id; | |
| var ourId = this.id; | |
| Object.keys(peers).forEach(function(id, index) { | |
| var peer = peers[id]; | |
| if(!(peer.id === ourId || peer.id === currentId)) { | |
| peer.rpc.invoke('network.newPeer', [currentId, current.port, current.host], function(err, result) { | |
| if(err || !result.pass) { | |
| peer.destroy(); | |
| } | |
| }) | |
| } | |
| }); | |
| }; | |
| /** | |
| * | |
| * | |
| * | |
| */ | |
| Network.prototype.run = function(peerList) { | |
| var messageChan = netowrk.createServer(); | |
| var port = 45550; | |
| messageChan.on('error', function(err) { | |
| if(err.code === 'EADDRINUSE') { | |
| messageChan.listen(0); | |
| netowrk.connectAsPeer(port); | |
| } | |
| }) | |
| messageChan.listen(port); | |
| if(peerList) { | |
| if(Array.isArray(peerList)) { | |
| for(var i = peerList.length - 1; i >= 0; i--) { | |
| netowrk.connectAsPeer(peerList[i].port, peerList[i].host); | |
| }; | |
| } else { | |
| netowrk.connectAsPeer(peerList.port, peerList.host); | |
| } | |
| } | |
| } | |
| Network.prototype.use = function(module) { | |
| var name = module.name; | |
| var handler = module.handler; | |
| this.stack[name] = handler; | |
| } | |
| /** | |
| * | |
| * | |
| * | |
| * | |
| * | |
| * | |
| */ | |
| console.time('Boot time test') | |
| var netowrk = new Network(); | |
| var testModule = function() { | |
| var privateA = 'a'; | |
| var privateB = 'b'; | |
| return { | |
| name : 'test', //Name the module | |
| handler : {//can be an object or function | |
| test : function() { | |
| this.set('test', true) | |
| this.set('a', privateA) | |
| this.set('b', privateB).send() | |
| } | |
| } | |
| } | |
| } | |
| netowrk.use(testModule()); | |
| netowrk.run(); | |
| /* | |
| netowrk.run([{ | |
| port : 89898, | |
| host : 'a.wiyc.info' | |
| }, { | |
| port : 89898, | |
| host : 'b.wiyc.info' | |
| }]); | |
| netowrk.run({ | |
| port : 89898, | |
| host : 'wiyc.info' | |
| }); | |
| */ | |
| console.timeEnd('Boot time test') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment