Last active
December 13, 2015 17:59
-
-
Save ishiduca/4952394 to your computer and use it in GitHub Desktop.
Re: 「はじめてのReadable Stream - 四角革命前夜」 http://d.hatena.ne.jp/sasaplus1/20130213/1360688911
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var counter = new CounterStream; | |
counter.on('error', function (err) { | |
console.error(err); | |
process.exit(1); | |
}); | |
counter.on('data', function (count) { | |
process.stdout.write(count + "\n"); | |
count = Number(count); | |
if (count === 5) { | |
counter.pause(); | |
setTimeout(function () { | |
console.log('restart'); | |
counter.resume(); | |
}, 3000); | |
console.log('wait 3sec'); | |
} | |
if (count === 10) counter.destroy(); | |
}); | |
counter.once('end', function () { | |
console.log('!! finish'); | |
}); | |
counter.resume(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var CounterStream = require('./read').CounterStream; | |
var counter = new CounterStream; | |
counter.on('error', function (err) { | |
console.error(err); | |
process.exit(1); | |
}); | |
counter.on('data', function (count) { | |
if (Number(count) === 10) counter.destroy(); | |
}); | |
counter.pipe(process.stdout); | |
counter.resume(); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var util = require('util') | |
, stream = require('stream') | |
; | |
function CounterStream (interval) { | |
this.readable = true; | |
this.ended = false; | |
this.interval = interval || 500; | |
this.count = 0; | |
this.once('close', function () { | |
this.emit('end'); | |
}.bind(this)); | |
} | |
util.inherits(CounterStream, stream.Stream); | |
var cp = CounterStream.prototype; | |
cp.setEncoding = function () {}; | |
cp.resume = function () { | |
if (this.ended) { | |
return this.emit('error' | |
, new Error('this stream closed')); | |
} | |
if (this.readable) { | |
this.intervalID = setInterval( | |
this.inc.bind(this) | |
, this.interval | |
); | |
this.inc(); | |
} | |
this.readable = false; | |
}; | |
cp.pause = function () { | |
if (this.ended) { | |
return this.emit('error' | |
, new Error('this stream closed')); | |
} | |
clearInterval(this.intervalID); | |
delete this.intervalID; | |
this.readable = true; | |
}; | |
cp.destroy = function () { | |
this.readable = false; | |
this.ended = true; | |
clearInterval(this.intervalID); | |
delete this.intervalID; | |
this.emit('close'); | |
}; | |
cp.inc = function () { | |
this.emit('data', (++this.count).toString()); | |
}; |
./help.js
var QUnit = require('qunitjs')
, qunitTap = require('qunit-tap').qunitTap
, util = require('util')
;
qunitTap(QUnit, util.puts);
QUnit.init();
QUnit.config.updateRate = 0;
('deepEqual equal notDeepEqual notEqual notStrictEqual ok strictEqual throws test asyncTest').split(' ')
.forEach(function (keyword) {
global[keyword] = QUnit[keyword];
});
module.exports.QUnit = QUnit;
./xt/inc.js
var path = require('path')
, QUnit = require(path.join(__dirname, '../help')).QUnit
, CountStream = require(path.join(__dirname, '../counter')).CountStream
;
test('asyncTest', function () {
QUnit.stop();
var counter = new CountStream(1000);
counter.on('error', function (err) {
console.error(err);
ok(counter.ended, 'true === counter.ended');
ok(! counter.readable, 'false === counter.readable');
QUnit.start();
});
counter.on('data', function (n) {
if (5 === Number(n)) return counter.destroy();
equal(n, counter.count.toString(), "count: " + n);
});
counter.on('end', function () {
equal(true, counter.ended, '"true" === counter.ended');
equal(false, counter.readable, '"false" === counter.readable');
counter.resume(); // emit('error')
});
counter.resume();
});
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
read.js 一部書き直した。
pipe を使うと、writableStreamが drain 可能な状態の時点で「頻繁に」 resume を呼ぶケースがある。
ただ、その場合 readableStreamの readable === false であれば、resume しない。
なので、明示的にpauseを呼ばない限りは readable は falseにしておく。
蛇口の栓を閉じておくイメージ