Skip to content

Instantly share code, notes, and snippets.

@mcollina
Created August 21, 2018 16:11
Show Gist options
  • Save mcollina/0676ee5f375c233ca147db8a9dbd0cf1 to your computer and use it in GitHub Desktop.
Save mcollina/0676ee5f375c233ca147db8a9dbd0cf1 to your computer and use it in GitHub Desktop.
'use strict'
const { Readable, finished } = require('stream')
const { promisify } = require(util)
async function run (origin, dest) {
try {
const write = buildWrite(dest)
// This is an async iterator
for await (let chunk of origin) {
await write(chunk.toString().toUpperCase())
}
await promisify(finished)(dest)
} catch (err) {
origin.destroy(err)
dest.destroy(err)
}
}
function buildWrite (stream) {
// This is a good way of wrapping stream.write into a Promise.
// We are waiting for a drain event to resolve, and we are wrapping
// the error event. A consumer should probably use finished to
// know if the stream has completed.
const streamError = null
stream.on('error', function (err) {
streamError = err
})
return write
function write (chunk) {
if (streamError) {
return Promise.reject(streamError)
}
return new Promise(function (resolve, reject) {
const res = stream.write(chunk)
if (res) {
resolve()
} else {
stream.once('drain', resolve)
}
})
}
}
// startup and run the pipeline
const origin = new Readable({
read (n) {
this.push('hello')
this.push(' ')
this.push('world')
this.push(null)
}
})
run(origin, process.stdout).catch(console.log)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment