Skip to content

Instantly share code, notes, and snippets.

@ishiduca
Last active August 29, 2015 14:08
Show Gist options
  • Save ishiduca/59d91dff5ca099448f3e to your computer and use it in GitHub Desktop.
Save ishiduca/59d91dff5ca099448f3e to your computer and use it in GitHub Desktop.
pipeで繋いだStreamが例外を発行した場合の処理 ref: http://qiita.com/ishiduca/items/30ef3a4db72124ff9926
var test = require('tape').test
var stream = require('stream')
function setup (src) {
var queue = src.slice(0)
var rs = new stream.Readable({objectMode: true})
rs._read = function () {
this.push(queue.shift() || null)
}
var ws = new stream.Writable({objectMode: true})
ws.spy = []
ws._write = function (chnk, enc, done) {
this.spy.push(chnk)
done()
}
var ts = new stream.Transform({objectMode: true})
ts.spy = []
ts._transform = function (chnk, enc, done) {
if (chnk.sError)
return done(chnk.sError)
this.spy.push(chnk)
this.push(chnk)
done()
}
return {
rs: rs
, ws: ws
, ts: ts
}
}
test('transformStreamがエラーを発生させた場合 # リカバリーしない', function (t) {
var error = new Error('foo is not bar')
var src = [
{sError: false, value: 'abc'}
, {sError: error, value: 'def'}
, {sError: false, value: 'ghi'}
]
var ss = setup(src)
var rs = ss.rs
var ts = ss.ts
var ws = ss.ws
var spy = []
Object.keys(ss).forEach(function (name) {
ss[name].on('error', function (err) {
spy.push(name + ' error ' + err.message)
})
})
rs.on('end', function () { spy.push('rs end') })
ts.on('finish', function () { spy.push('ts finish') })
ts.on('end', function () { spy.push('ts end') })
ws.on('finish', function () { spy.push('ws finish') })
rs
.pipe(ts)
.pipe(ws)
setTimeout(function () {
t.deepEqual(rs._readableState.buffer, [{sError: false, value: 'ghi'}])
t.deepEqual(spy, ['ts error foo is not bar'])
t.deepEqual(ts.spy, [ {sError: false, value: 'abc'} ])
t.deepEqual(ws.spy, [ {sError: false, value: 'abc'} ])
t.end()
}, 1000)
})
test('transformStreamがエラーを発生させた場合 # on("error")の中でpipeを繋ぎ直す', function (t) {
var error = new Error('foo is not bar')
var src = [
{sError: false, value: 'abc'}
, {sError: error, value: 'def'}
, {sError: false, value: 'ghi'}
]
var ss = setup(src)
var rs = ss.rs
var ts = ss.ts
var ws = ss.ws
var spy = []
Object.keys(ss).forEach(function (name) {
ss[name].on('error', function (err) {
spy.push(name + ' error ' + err.message)
})
})
ts.on('error', function () {
rs.unpipe(ts)
ts.unpipe(ws)
rs.pipe(ts).pipe(ws)
})
rs.on('end', function () { spy.push('rs end') })
ts.on('finish', function () { spy.push('ts finish') })
ts.on('end', function () { spy.push('ts end') })
ws.on('finish', function () { spy.push('ws finish') })
rs
.pipe(ts)
.pipe(ws)
setTimeout(function () {
t.deepEqual(rs._readableState.buffer, [])
t.deepEqual(spy, ['ts error foo is not bar', 'rs end', 'ts finish', 'ts end', 'ws finish'])
t.deepEqual(ts.spy, [{sError: false, value: 'abc'}, {sError: false, value: 'ghi'}])
t.deepEqual(ws.spy, [{sError: false, value: 'abc'}, {sError: false, value: 'ghi'}])
t.end()
}, 1000)
})
ts.on('error', ...)
ts.on('unpipe', functon (src) {
src.pipe(this)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment