Skip to content

Instantly share code, notes, and snippets.

@ishiduca
Created October 22, 2016 09:34
Show Gist options
  • Save ishiduca/46d36f6c2f729aedd903d49507935ad0 to your computer and use it in GitHub Desktop.
Save ishiduca/46d36f6c2f729aedd903d49507935ad0 to your computer and use it in GitHub Desktop.
JSON-RPC2.0 library(blue-frog) on Websocket(websocket-stream)
'use strict'
var http = require('http')
var path = require('path')
var ecstatic = require('ecstatic')(path.join(__dirname, 'static'))
var websocket = require('websocket-stream')
var response = require('blue-frog-core/response')
var router = require('../index')()
router.add('echo', (params, result, done) => {
process.nextTick(() => {
done(response(result.id, params))
})
})
router.add('broadcast', (params, result, done) => {
process.nextTick(() => {
done.broadcast(response(result.id, params))
})
})
var app = http.createServer(ecstatic)
var wss = websocket.createServer({server: app}, stream => {
router.setup(stream, (f, result, done) => {
f(result.params, result, done)
})
})
app.listen(9999, console.log.bind(console, 'server listen on port "9999"'))
'use strict'
var through = require('through2')
var rpc = require('blue-frog-stream')
var request = require('blue-frog-core/request')
var websocket = require('websocket-stream')
var uri = 'ws://' + location.hostname + ':' + location.port
window.onload = function () {
var ws; try {
ws = websocket(uri)
} catch (err) {
return console.error(err)
}
ws.once('close', function () {
console.log('ws.closed')
})
ws.once('end', function () {
console.log('ws.ended')
})
ws.once('finish', function () {
console.log('ws.finished')
})
ws.on('pipe', function (src) {
console.log('batch.pipe(ws)')
})
ws.on('unpipe', function (src) {
console.log('batch.unpipe(ws)')
})
ws.once('connect', function () {
console.log('ws.connected')
createBatch(ws).end(request('000', 'echo', {message: 'Hello'}))
})
ws.pipe(through(function (buf, _, done) {
var data = String(buf)
var rpcResponse; try {
rpcResponse = JSON.parse(data)
} catch (err) {
console.log(err)
return done()
}
var parse = new rpc.response.ParseStream(rpcResponse)
parse.on('error', function (err) {
console.error(err)
})
parse.once('end', function () {
console.log('parse.ended')
})
parse.pipe(through.obj(function (res, _, done) {
console.log(res)
done()
}))
done()
}))
setTimeout(function () {
var b = createBatch(ws)
b.write(request('010', 'echo', {message: 'echo test 1'}))
b.write(request('011', 'broadcast', {message: 'broadcast test 1'}))
b.end( request('012', 'echo', {message: 'echo test 2'}))
}, 1000)
setTimeout(function () {
var b = createBatch(ws)
b.write(request('020', 'broadcast', {message: 'broadcast test 1'}))
b.write(request('021', 'broadcast', {message: 'broadcast test 2'}))
b.end( request('022', 'broadcast', {message: 'broadcast test 3'}))
}, 2000)
setTimeout(function () {
var b = createBatch(ws)
b.write(request('030', 'echo', {message: 'echo test 1'}))
b.write(request('031', 'broadcast', {message: 'broadcast test 1'}))
b.end( request('032', 'echo', {message: 'echo test 2'}))
}, 3000)
}
function createBatch (ws) {
var batch = new rpc.request.BatchStream(true)
batch.once('finish', function () {
console.log('batch.finished')
})
batch.once('end', function () {
console.log('batch.ended')
batch.unpipe(ws)
})
batch.pipe(ws, {end: false})
batch.once('data', function (str) {
console.log(str)
})
return batch
}
var rpc = require('blue-frog-stream')
var xtend = require('xtend')
var response = require('blue-frog-core/response')
var rpcError = require('blue-frog-core/error')
var through = require('through2')
module.exports = Router
function Router () {
if (!(this instanceof Router)) return new Router
this.routes = {}
this.streams = []
}
Router.prototype.setup = function (stream, onGetResult) {
var me = this
this.streams.push(stream)
stream.once('close', function () {
me.streams = me.streams.filter(function (s) { return s !== stream})
console.log('ws.closed')
console.log('Router.streams.remove ws')
})
stream.once('finish', function () {
console.log('ws.finished')
})
stream.once('end', function () {
console.log('ws.ended')
})
var streams = this.streams
function broadcast (data) {
for (var i = 0, len = streams.length; i < len; i++) {
streams[i].write(data)
}
}
stream.pipe(through(function (buf, _, done) {
var isDone = false
var batch = new rpc.response.BatchStream(true)
batch.once('data', function (rpcResponseStr) {
console.log(rpcResponseStr)
done(null, rpcResponseStr)
isDone = true
this.push(null)
})
batch.once('end', function () {
if (! isDone) {
done()
this.push(null)
}
console.log('responseBatchStream.ended')
})
batch.once('finish', function () {
console.log('responseBatchStream.finished')
})
var data = String(buf)
var rpcRequest; try {
rpcRequest = JSON.parse(data)
} catch (_err) {
var err = getRpcParseError(_err, data)
return batch.end(err)
}
var parse = new rpc.request.ParseStream(rpcRequest)
parse.on('error', function (err) {
this.push(getRpcInvalidRequest(err))
})
parse.once('end', function () {
console.log('requestParseStream.ended')
})
parse.pipe(through.obj(function (req, _, done) {
if (req.error) return done(null, req)
var value = me.route(req.method)
if (! value) return done(null, getRpcMethodNotFound(req))
function end (data) {
done(null, data)
}
end.broadcast = function (data) {
broadcast(JSON.stringify(data))
end()
}
onGetResult(value, req, end)
}))
.pipe(batch)
}))
.pipe(stream)
}
Router.prototype.route = function (method) {
return this.routes[method]
}
Router.prototype.add = function (method, value) {
return (this.routes[method] = value)
}
function getRpcParseError (_err, data) {
var err = new SyntaxError('JSON parse error: ' + _err.message, _err)
err.data = data
return response.error(null, rpcError.ParseError(err))
}
function getRpcInvalidRequest (err) {
return response.error(err.id || null, rpcError.InvalidError(err))
}
function getRpcMethodNotFound (req) {
var err = rpcError.MethodNotFound('method "' + req.method + '" not found')
return response.error(req.id || null, err)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment