Created
October 22, 2016 09:34
-
-
Save ishiduca/46d36f6c2f729aedd903d49507935ad0 to your computer and use it in GitHub Desktop.
JSON-RPC2.0 library(blue-frog) on Websocket(websocket-stream)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
var http = require('http') | |
var path = require('path') | |
var ecstatic = require('ecstatic')(path.join(__dirname, 'static')) | |
var websocket = require('websocket-stream') | |
var response = require('blue-frog-core/response') | |
var router = require('../index')() | |
router.add('echo', (params, result, done) => { | |
process.nextTick(() => { | |
done(response(result.id, params)) | |
}) | |
}) | |
router.add('broadcast', (params, result, done) => { | |
process.nextTick(() => { | |
done.broadcast(response(result.id, params)) | |
}) | |
}) | |
var app = http.createServer(ecstatic) | |
var wss = websocket.createServer({server: app}, stream => { | |
router.setup(stream, (f, result, done) => { | |
f(result.params, result, done) | |
}) | |
}) | |
app.listen(9999, console.log.bind(console, 'server listen on port "9999"')) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
var through = require('through2') | |
var rpc = require('blue-frog-stream') | |
var request = require('blue-frog-core/request') | |
var websocket = require('websocket-stream') | |
var uri = 'ws://' + location.hostname + ':' + location.port | |
window.onload = function () { | |
var ws; try { | |
ws = websocket(uri) | |
} catch (err) { | |
return console.error(err) | |
} | |
ws.once('close', function () { | |
console.log('ws.closed') | |
}) | |
ws.once('end', function () { | |
console.log('ws.ended') | |
}) | |
ws.once('finish', function () { | |
console.log('ws.finished') | |
}) | |
ws.on('pipe', function (src) { | |
console.log('batch.pipe(ws)') | |
}) | |
ws.on('unpipe', function (src) { | |
console.log('batch.unpipe(ws)') | |
}) | |
ws.once('connect', function () { | |
console.log('ws.connected') | |
createBatch(ws).end(request('000', 'echo', {message: 'Hello'})) | |
}) | |
ws.pipe(through(function (buf, _, done) { | |
var data = String(buf) | |
var rpcResponse; try { | |
rpcResponse = JSON.parse(data) | |
} catch (err) { | |
console.log(err) | |
return done() | |
} | |
var parse = new rpc.response.ParseStream(rpcResponse) | |
parse.on('error', function (err) { | |
console.error(err) | |
}) | |
parse.once('end', function () { | |
console.log('parse.ended') | |
}) | |
parse.pipe(through.obj(function (res, _, done) { | |
console.log(res) | |
done() | |
})) | |
done() | |
})) | |
setTimeout(function () { | |
var b = createBatch(ws) | |
b.write(request('010', 'echo', {message: 'echo test 1'})) | |
b.write(request('011', 'broadcast', {message: 'broadcast test 1'})) | |
b.end( request('012', 'echo', {message: 'echo test 2'})) | |
}, 1000) | |
setTimeout(function () { | |
var b = createBatch(ws) | |
b.write(request('020', 'broadcast', {message: 'broadcast test 1'})) | |
b.write(request('021', 'broadcast', {message: 'broadcast test 2'})) | |
b.end( request('022', 'broadcast', {message: 'broadcast test 3'})) | |
}, 2000) | |
setTimeout(function () { | |
var b = createBatch(ws) | |
b.write(request('030', 'echo', {message: 'echo test 1'})) | |
b.write(request('031', 'broadcast', {message: 'broadcast test 1'})) | |
b.end( request('032', 'echo', {message: 'echo test 2'})) | |
}, 3000) | |
} | |
function createBatch (ws) { | |
var batch = new rpc.request.BatchStream(true) | |
batch.once('finish', function () { | |
console.log('batch.finished') | |
}) | |
batch.once('end', function () { | |
console.log('batch.ended') | |
batch.unpipe(ws) | |
}) | |
batch.pipe(ws, {end: false}) | |
batch.once('data', function (str) { | |
console.log(str) | |
}) | |
return batch | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var rpc = require('blue-frog-stream') | |
var xtend = require('xtend') | |
var response = require('blue-frog-core/response') | |
var rpcError = require('blue-frog-core/error') | |
var through = require('through2') | |
module.exports = Router | |
function Router () { | |
if (!(this instanceof Router)) return new Router | |
this.routes = {} | |
this.streams = [] | |
} | |
Router.prototype.setup = function (stream, onGetResult) { | |
var me = this | |
this.streams.push(stream) | |
stream.once('close', function () { | |
me.streams = me.streams.filter(function (s) { return s !== stream}) | |
console.log('ws.closed') | |
console.log('Router.streams.remove ws') | |
}) | |
stream.once('finish', function () { | |
console.log('ws.finished') | |
}) | |
stream.once('end', function () { | |
console.log('ws.ended') | |
}) | |
var streams = this.streams | |
function broadcast (data) { | |
for (var i = 0, len = streams.length; i < len; i++) { | |
streams[i].write(data) | |
} | |
} | |
stream.pipe(through(function (buf, _, done) { | |
var isDone = false | |
var batch = new rpc.response.BatchStream(true) | |
batch.once('data', function (rpcResponseStr) { | |
console.log(rpcResponseStr) | |
done(null, rpcResponseStr) | |
isDone = true | |
this.push(null) | |
}) | |
batch.once('end', function () { | |
if (! isDone) { | |
done() | |
this.push(null) | |
} | |
console.log('responseBatchStream.ended') | |
}) | |
batch.once('finish', function () { | |
console.log('responseBatchStream.finished') | |
}) | |
var data = String(buf) | |
var rpcRequest; try { | |
rpcRequest = JSON.parse(data) | |
} catch (_err) { | |
var err = getRpcParseError(_err, data) | |
return batch.end(err) | |
} | |
var parse = new rpc.request.ParseStream(rpcRequest) | |
parse.on('error', function (err) { | |
this.push(getRpcInvalidRequest(err)) | |
}) | |
parse.once('end', function () { | |
console.log('requestParseStream.ended') | |
}) | |
parse.pipe(through.obj(function (req, _, done) { | |
if (req.error) return done(null, req) | |
var value = me.route(req.method) | |
if (! value) return done(null, getRpcMethodNotFound(req)) | |
function end (data) { | |
done(null, data) | |
} | |
end.broadcast = function (data) { | |
broadcast(JSON.stringify(data)) | |
end() | |
} | |
onGetResult(value, req, end) | |
})) | |
.pipe(batch) | |
})) | |
.pipe(stream) | |
} | |
Router.prototype.route = function (method) { | |
return this.routes[method] | |
} | |
Router.prototype.add = function (method, value) { | |
return (this.routes[method] = value) | |
} | |
function getRpcParseError (_err, data) { | |
var err = new SyntaxError('JSON parse error: ' + _err.message, _err) | |
err.data = data | |
return response.error(null, rpcError.ParseError(err)) | |
} | |
function getRpcInvalidRequest (err) { | |
return response.error(err.id || null, rpcError.InvalidError(err)) | |
} | |
function getRpcMethodNotFound (req) { | |
var err = rpcError.MethodNotFound('method "' + req.method + '" not found') | |
return response.error(req.id || null, err) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment