Skip to content

Instantly share code, notes, and snippets.

@ishiduca
Last active December 13, 2015 17:59
Show Gist options
  • Save ishiduca/4952394 to your computer and use it in GitHub Desktop.
Save ishiduca/4952394 to your computer and use it in GitHub Desktop.
Re: 「はじめてのReadable Stream - 四角革命前夜」 http://d.hatena.ne.jp/sasaplus1/20130213/1360688911
var counter = new CounterStream;
counter.on('error', function (err) {
console.error(err);
process.exit(1);
});
counter.on('data', function (count) {
process.stdout.write(count + "\n");
count = Number(count);
if (count === 5) {
counter.pause();
setTimeout(function () {
console.log('restart');
counter.resume();
}, 3000);
console.log('wait 3sec');
}
if (count === 10) counter.destroy();
});
counter.once('end', function () {
console.log('!! finish');
});
counter.resume();
var CounterStream = require('./read').CounterStream;
var counter = new CounterStream;
counter.on('error', function (err) {
console.error(err);
process.exit(1);
});
counter.on('data', function (count) {
if (Number(count) === 10) counter.destroy();
});
counter.pipe(process.stdout);
counter.resume();
var util = require('util')
, stream = require('stream')
;
function CounterStream (interval) {
this.readable = true;
this.ended = false;
this.interval = interval || 500;
this.count = 0;
this.once('close', function () {
this.emit('end');
}.bind(this));
}
util.inherits(CounterStream, stream.Stream);
var cp = CounterStream.prototype;
cp.setEncoding = function () {};
cp.resume = function () {
if (this.ended) {
return this.emit('error'
, new Error('this stream closed'));
}
if (this.readable) {
this.intervalID = setInterval(
this.inc.bind(this)
, this.interval
);
this.inc();
}
this.readable = false;
};
cp.pause = function () {
if (this.ended) {
return this.emit('error'
, new Error('this stream closed'));
}
clearInterval(this.intervalID);
delete this.intervalID;
this.readable = true;
};
cp.destroy = function () {
this.readable = false;
this.ended = true;
clearInterval(this.intervalID);
delete this.intervalID;
this.emit('close');
};
cp.inc = function () {
this.emit('data', (++this.count).toString());
};
@ishiduca
Copy link
Author

read.js 一部書き直した。
pipe を使うと、writableStreamが drain 可能な状態の時点で「頻繁に」 resume を呼ぶケースがある。
ただ、その場合 readableStreamの readable === false であれば、resume しない。
なので、明示的にpauseを呼ばない限りは readable は falseにしておく。
蛇口の栓を閉じておくイメージ

@ishiduca
Copy link
Author

./help.js

var QUnit    = require('qunitjs')
,   qunitTap = require('qunit-tap').qunitTap
,   util     = require('util')
;

qunitTap(QUnit, util.puts);
QUnit.init();
QUnit.config.updateRate = 0;

('deepEqual equal notDeepEqual notEqual notStrictEqual ok strictEqual throws test asyncTest').split(' ')
.forEach(function (keyword) {
    global[keyword] = QUnit[keyword];
});

module.exports.QUnit = QUnit;

./xt/inc.js

var path = require('path')
  , QUnit = require(path.join(__dirname, '../help')).QUnit
  , CountStream = require(path.join(__dirname, '../counter')).CountStream
;

test('asyncTest', function () {
    QUnit.stop();

    var counter = new CountStream(1000);

    counter.on('error', function (err) {
        console.error(err);

        ok(counter.ended,      'true  === counter.ended');
        ok(! counter.readable, 'false === counter.readable');

        QUnit.start();
    });

    counter.on('data', function (n) {
        if (5 === Number(n)) return counter.destroy();

        equal(n, counter.count.toString(), "count: " + n);
    });

    counter.on('end', function () {
        equal(true,  counter.ended,    '"true"  === counter.ended');
        equal(false, counter.readable, '"false" === counter.readable');

        counter.resume(); // emit('error')
    });
    counter.resume();
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment