Skip to content

Instantly share code, notes, and snippets.

@kharandziuk
Created July 5, 2019 15:41
Show Gist options
  • Select an option

  • Save kharandziuk/2f6428b047d9cc2801ed4c995750b6dc to your computer and use it in GitHub Desktop.

Select an option

Save kharandziuk/2f6428b047d9cc2801ed4c995750b6dc to your computer and use it in GitHub Desktop.
a solution for a sequence of streams
const stream = require('stream');
const process = require('process')
const { promisify } = require('util')
const fromList = (lst) => new stream.Readable({
read() {
if (lst.length) {
this.push(String(lst.shift()))
} else {
this.push(null)
}
}
})
const _finished = promisify(stream.finished)
const sequenceStream = (streams) => {
const resultingStream = new stream.PassThrough()
let isNext = Promise.resolve()
for(const [i, curStream] of streams.entries()) {
isNext = isNext.then(() => {
curStream.pipe(resultingStream, {end: i === streams.length -1})
return _finished(curStream)
}).catch((err) => {
resultingStream.write(err)
})
}
return resultingStream
}
sequenceStream([
fromList([1, 2, 3, 4]),
fromList([5, 6, 7, 8]),
fromList([9, 10])
]).pipe(process.stdout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment