Skip to content

Instantly share code, notes, and snippets.

@ishiduca
Last active January 8, 2016 12:43
Show Gist options
  • Save ishiduca/0e35aee3998af8843d87 to your computer and use it in GitHub Desktop.
Save ishiduca/0e35aee3998af8843d87 to your computer and use it in GitHub Desktop.
非同期の待ち合わせストリーム
var stream = require('readable-stream')
var inherits = require('inherits')
var eos = require('end-of-stream')
function Lazy () {
if (!(this instanceof Lazy)) return new Lazy
stream.Readable.call(this)
this.onWorks = []
this.count = 0
}
inherits(Lazy, stream.Readable)
Lazy.prototype._read = function () {}
Lazy.prototype.add = function (strm) {
this.onWorks.push(strm)
this.count += 1
var me = this
eos(strm, function (err) {
if (err)
me.emit('error', err)
else
me.push(String(me.onWorks.indexOf(strm)))
if (0 === (me.count -= 1)) {
me.push(null)
me.onWorks = [] // null
}
})
}
Lazy.prototype.wait = function (f) {
var payload = {id: (this.count += 1)}
this.onWorks.push(payload)
var me = this
f(function clear (err) {
if (err)
me.emit('error', err)
else
me.push(String(me.onWorks.indexOf(payload)))
if (0 === (me.count -= 1)) {
me.push(null)
me.onWorks = []
}
})
}
module.exports = Lazy
{
"name": "clazy",
"version": "0.0.1",
"description": "",
"main": "index.js",
"scripts": {
"test": "tape t/*"
},
"author": "[email protected]",
"license": "MIT",
"dependencies": {
"end-of-stream": "^1.1.0",
"inherits": "^2.0.1",
"readable-stream": "^2.0.5"
},
"devDependencies": {
"tape": "^4.4.0",
"through2": "^2.0.0"
}
}
'use strict'
var test = require('tape')
var Lazy = require('../index')
var stream = require('readable-stream')
test('var lazy = new Lazy', function (t) {
var lazy = new Lazy
t.ok(lazy, 'var lazy = new Lazy')
t.ok(lazy._readableState, 'exists lazy._readableState')
t.end()
})
test('lazy.add(readableStream)', function (t) {
var lazy = new Lazy()
var spy = []
lazy.on('data', function (count) {
spy.push(Number(count))
})
lazy.once('end', function () {
t.ok(1, 'lazy.emit "end"')
t.is(spy.length, 9, 'lazy.emit "data" 9 times')
t.deepEqual(spy.sort(), [0,1,2,3,4,5,6,7,8])
t.end()
})
;('123456789').split('').forEach(function (n) {
var rs = new stream.Readable
rs._read = function () {}
rs.on('data', function (data) {/*dummy*/})
lazy.add(rs)
rs.push(n)
rs.push(null)
})
})
test('lazy.add(readableStream) with error', function (t) {
var lazy = new Lazy()
var spy = []
var ers = []
lazy.on('data', function (count) {
spy.push(Number(count))
})
lazy.on('error', function (err) {
ers.push(Number(err.message))
})
lazy.once('end', function () {
t.ok(1, 'lazy.emit "end"')
t.is(spy.length, 4, 'lazy.emit "data" 4 times')
t.is(ers.length, 5, 'lazy.emit "error" 5 times')
t.deepEqual(spy.sort(), [1,3,5,7])
t.deepEqual(ers.sort(), [1,3,5,7,9])
t.end()
})
;('123456789').split('').forEach(function (n) {
var rs = new stream.Readable
rs._read = function () {}
rs.on('data', function (data) {/*dummy*/})
lazy.add(rs)
process.nextTick(function () {
if (n % 2) return rs.emit('error', new Error(n))
rs.push(n)
rs.push(null)
})
})
})
test('lazy.add(writableStream)', function (t) {
var lazy = Lazy()
var spy = []
lazy.on('data', function (count) {
spy.push(Number(count))
})
lazy.once('end', function () {
t.ok(1, 'lazy.emit "end"')
t.is(spy.length, 9, 'lazy.emit "data" 9 times')
t.deepEqual(spy.sort(), [0,1,2,3,4,5,6,7,8])
t.end()
})
;('123456789').split('').forEach(function (n) {
var ws = new stream.Writable
ws._write = function (data, enc, done) {
done(null, data)
}
lazy.add(ws)
ws.end(n)
})
})
test('lazy.add(writableStream) with error', function (t) {
var lazy = Lazy()
var spy = []
var ers = []
lazy.on('data', function (count) {
spy.push(Number(count))
})
lazy.on('error', function (err) {
ers.push(Number(err.message))
})
lazy.once('end', function () {
t.ok(1, 'lazy.emit "end"')
t.is(spy.length, 4, 'lazy.emit "data" 4 times')
t.is(ers.length, 5, 'lazy.emit "error" 5 times')
t.deepEqual(spy.sort(), [1,3,5,7])
t.deepEqual(ers.sort(), [1,3,5,7,9])
t.end()
})
;('123456789').split('').forEach(function (n) {
var ws = new stream.Writable
ws._write = function (data, enc, done) {
if (data % 2) return done(new Error(data))
done(null, data)
}
lazy.add(ws)
process.nextTick(function () {
ws.end(n)
})
})
})
'use strict'
var test = require('tape')
var Lazy = require('../index')
test('lazy.wait(function (clear) { ... })', function (t) {
var lazy = new Lazy
var spy = []
lazy
.on('data', function (count) {
spy.push(String(count))
})
.once('end', function () {
t.ok(1, 'lazy.emit "end"')
t.is(spy.length, 4, 'lazy.emit "data" 4 time')
t.end()
})
.on('error', function (err) {
console.error(err)
})
;[4,3,2,1].forEach(function (n) {
lazy.wait(function (clear) {
setTimeout(function () {
clear()
}, n * 100)
})
})
})
test('lazy.wait(function (clear) { ... }) with error', function (t) {
var lazy = new Lazy
var spy = []
var ers = []
lazy
.on('data', function (count) {
spy.push(String(count))
})
.once('end', function () {
t.ok(1, 'lazy.emit "end"')
t.is(spy.length, 2, 'lazy.emit "data" 2 times')
t.is(ers.length, 2, 'lazy.emit "error" 2 times')
t.end()
})
.on('error', function (err) {
ers.push(String(err.message))
})
;[4,3,2,1].forEach(function (n) {
lazy.wait(function (clear) {
setTimeout(function () {
;(n % 2) ? clear() : clear(new Error(String(n)))
}, n * 100)
})
})
})
tape t/*
TAP version 13
# var lazy = new Lazy
ok 1 var lazy = new Lazy
ok 2 exists lazy._readableState
# lazy.add(readableStream)
ok 3 lazy.emit "end"
ok 4 lazy.emit "data" 9 times
ok 5 should be equivalent
# lazy.add(readableStream) with error
ok 6 lazy.emit "end"
ok 7 lazy.emit "data" 4 times
ok 8 lazy.emit "error" 5 times
ok 9 should be equivalent
ok 10 should be equivalent
# lazy.add(writableStream)
ok 11 lazy.emit "end"
ok 12 lazy.emit "data" 9 times
ok 13 should be equivalent
# lazy.add(writableStream) with error
ok 14 lazy.emit "end"
ok 15 lazy.emit "data" 4 times
ok 16 lazy.emit "error" 5 times
ok 17 should be equivalent
ok 18 should be equivalent
# lazy.wait(function (clear) { ... })
ok 19 lazy.emit "end"
ok 20 lazy.emit "data" 4 time
# lazy.wait(function (clear) { ... }) with error
ok 21 lazy.emit "end"
ok 22 lazy.emit "data" 2 times
ok 23 lazy.emit "error" 2 times
1..23
# tests 23
# pass 23
# ok
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment