Created
September 3, 2014 12:33
-
-
Save bennadel/a1784bcf60588666261e to your computer and use it in GitHub Desktop.
How Error Events Affect Piped Streams In Node.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Include module references. | |
var stream = require( "stream" ); | |
var util = require( "util" ); | |
var chalk = require( "chalk" ); | |
// ---------------------------------------------------------- // | |
// ---------------------------------------------------------- // | |
// I am a reabable stream in object-mode. | |
function Source() { | |
stream.Readable.call( | |
this, | |
{ | |
objectMode: true | |
} | |
); | |
this._source = [ "What", "it", "be", "like?" ]; | |
} | |
util.inherits( Source, stream.Readable ); | |
Source.prototype._read = function( sizeIsIgnored ) { | |
// Emit an error every time we're asked to read data from the underlying source. | |
// -- | |
// NOTE: You would never want to do this - I am only doing this to | |
// demonstrate the interplay between Readable streams and error events. | |
this.emit( "error", new Error( "StreamError" ) ); | |
while ( this._source.length ) { | |
if ( ! this.push( this._source.shift() ) ) { | |
break; | |
} | |
} | |
if ( ! this._source.length ) { | |
this.push( null ); | |
} | |
}; | |
// ---------------------------------------------------------- // | |
// ---------------------------------------------------------- // | |
// I am a writable stream in object-mode that may or may not emit errors (based on | |
// the instantiation arguments). | |
function Target( doEmitError ) { | |
stream.Writable.call( | |
this, | |
{ | |
objectMode: true | |
} | |
); | |
this._emitError = ( doEmitError === true ); | |
this._buffer = ""; | |
this.on( | |
"finish", | |
function handleFinish() { | |
this.emit( "debug", this._buffer ); | |
} | |
); | |
} | |
util.inherits( Target, stream.Writable ); | |
Target.prototype._write = function( chunk, encoding, writeDone ) { | |
this._buffer += ( chunk + " " ); | |
// Emit an error every time we go to write data into the running buffer. | |
// -- | |
// NOTE: You would never want to do this - I am only doing this to | |
// demonstrate the interplay between Readable streams and error events. | |
if ( this._emitError ) { | |
this.emit( "error", new Error( "StreamError (" + chunk + ")" ) ); | |
} | |
writeDone(); | |
}; | |
// ---------------------------------------------------------- // | |
// ---------------------------------------------------------- // | |
// Create an instance of our readable stream. | |
var source = new Source(); | |
var unsafeTarget = new Target( true ); | |
var safeTarget = new Target( false ); | |
// Debug errors on the source. | |
source.on( | |
"error", | |
function handleSourceError( error ) { | |
console.log( chalk.magenta( "Source error:", error.message ) ); | |
} | |
); | |
// Debug errors on the target. | |
// -- | |
// NOTE: We're only doing this for one of the targets since we know that the | |
// "safeTarget" will not emit any errors in this demo. | |
unsafeTarget.on( | |
"error", | |
function handleSourceError( error ) { | |
console.log( chalk.cyan( "Target error:", error.message ) ); | |
} | |
); | |
// When the target emits the error, the source is going to disconnect itself from | |
// the destination. | |
unsafeTarget.on( | |
"unpipe", | |
function handleTargetUnpipe( stream ) { | |
console.log( chalk.yellow( "Unpiped source:", ( stream === source ) ) ); | |
// At this point, the two streams have been disconnected. BUT, the two streams | |
// should continue to function 100% correctly. The errors have done nothing but | |
// interrupted the pipe-connection. As such, we can still write to the target. | |
this.write( "Written after pipe-break." ); | |
this.end( "Ended." ); | |
} | |
); | |
// Debug the state of the buffer when the UNSAFE target ends. | |
unsafeTarget.on( | |
"debug", | |
function handleUnsafeTargetDebug( buffer ) { | |
console.log( chalk.green( "Unsafe Target Buffer:", buffer ) ); | |
} | |
); | |
// Debug the state of the buffer when the SAFE target ends. | |
safeTarget.on( | |
"debug", | |
function handleSafeTargetDebug( buffer ) { | |
console.log( chalk.green( "Safe Target Buffer:", buffer ) ); | |
} | |
); | |
// ---------------------------------------------------------- // | |
// ---------------------------------------------------------- // | |
// Pipe the source into both the unsafe and safe targets. | |
source.pipe( unsafeTarget ); | |
source.pipe( safeTarget ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment