Skip to content

Instantly share code, notes, and snippets.

@bennadel
Created September 3, 2014 12:33
Show Gist options
  • Save bennadel/a1784bcf60588666261e to your computer and use it in GitHub Desktop.
Save bennadel/a1784bcf60588666261e to your computer and use it in GitHub Desktop.
How Error Events Affect Piped Streams In Node.js
// Include module references.
var stream = require( "stream" );
var util = require( "util" );
var chalk = require( "chalk" );
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am a reabable stream in object-mode.
function Source() {
stream.Readable.call(
this,
{
objectMode: true
}
);
this._source = [ "What", "it", "be", "like?" ];
}
util.inherits( Source, stream.Readable );
Source.prototype._read = function( sizeIsIgnored ) {
// Emit an error every time we're asked to read data from the underlying source.
// --
// NOTE: You would never want to do this - I am only doing this to
// demonstrate the interplay between Readable streams and error events.
this.emit( "error", new Error( "StreamError" ) );
while ( this._source.length ) {
if ( ! this.push( this._source.shift() ) ) {
break;
}
}
if ( ! this._source.length ) {
this.push( null );
}
};
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// I am a writable stream in object-mode that may or may not emit errors (based on
// the instantiation arguments).
function Target( doEmitError ) {
stream.Writable.call(
this,
{
objectMode: true
}
);
this._emitError = ( doEmitError === true );
this._buffer = "";
this.on(
"finish",
function handleFinish() {
this.emit( "debug", this._buffer );
}
);
}
util.inherits( Target, stream.Writable );
Target.prototype._write = function( chunk, encoding, writeDone ) {
this._buffer += ( chunk + " " );
// Emit an error every time we go to write data into the running buffer.
// --
// NOTE: You would never want to do this - I am only doing this to
// demonstrate the interplay between Readable streams and error events.
if ( this._emitError ) {
this.emit( "error", new Error( "StreamError (" + chunk + ")" ) );
}
writeDone();
};
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// Create an instance of our readable stream.
var source = new Source();
var unsafeTarget = new Target( true );
var safeTarget = new Target( false );
// Debug errors on the source.
source.on(
"error",
function handleSourceError( error ) {
console.log( chalk.magenta( "Source error:", error.message ) );
}
);
// Debug errors on the target.
// --
// NOTE: We're only doing this for one of the targets since we know that the
// "safeTarget" will not emit any errors in this demo.
unsafeTarget.on(
"error",
function handleSourceError( error ) {
console.log( chalk.cyan( "Target error:", error.message ) );
}
);
// When the target emits the error, the source is going to disconnect itself from
// the destination.
unsafeTarget.on(
"unpipe",
function handleTargetUnpipe( stream ) {
console.log( chalk.yellow( "Unpiped source:", ( stream === source ) ) );
// At this point, the two streams have been disconnected. BUT, the two streams
// should continue to function 100% correctly. The errors have done nothing but
// interrupted the pipe-connection. As such, we can still write to the target.
this.write( "Written after pipe-break." );
this.end( "Ended." );
}
);
// Debug the state of the buffer when the UNSAFE target ends.
unsafeTarget.on(
"debug",
function handleUnsafeTargetDebug( buffer ) {
console.log( chalk.green( "Unsafe Target Buffer:", buffer ) );
}
);
// Debug the state of the buffer when the SAFE target ends.
safeTarget.on(
"debug",
function handleSafeTargetDebug( buffer ) {
console.log( chalk.green( "Safe Target Buffer:", buffer ) );
}
);
// ---------------------------------------------------------- //
// ---------------------------------------------------------- //
// Pipe the source into both the unsafe and safe targets.
source.pipe( unsafeTarget );
source.pipe( safeTarget );
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment