Created
August 21, 2018 16:11
-
-
Save mcollina/0676ee5f375c233ca147db8a9dbd0cf1 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
'use strict' | |
const { Readable, finished } = require('stream') | |
const { promisify } = require(util) | |
async function run (origin, dest) { | |
try { | |
const write = buildWrite(dest) | |
// This is an async iterator | |
for await (let chunk of origin) { | |
await write(chunk.toString().toUpperCase()) | |
} | |
await promisify(finished)(dest) | |
} catch (err) { | |
origin.destroy(err) | |
dest.destroy(err) | |
} | |
} | |
function buildWrite (stream) { | |
// This is a good way of wrapping stream.write into a Promise. | |
// We are waiting for a drain event to resolve, and we are wrapping | |
// the error event. A consumer should probably use finished to | |
// know if the stream has completed. | |
const streamError = null | |
stream.on('error', function (err) { | |
streamError = err | |
}) | |
return write | |
function write (chunk) { | |
if (streamError) { | |
return Promise.reject(streamError) | |
} | |
return new Promise(function (resolve, reject) { | |
const res = stream.write(chunk) | |
if (res) { | |
resolve() | |
} else { | |
stream.once('drain', resolve) | |
} | |
}) | |
} | |
} | |
// startup and run the pipeline | |
const origin = new Readable({ | |
read (n) { | |
this.push('hello') | |
this.push(' ') | |
this.push('world') | |
this.push(null) | |
} | |
}) | |
run(origin, process.stdout).catch(console.log) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment