Last active
November 2, 2022 07:20
-
-
Save FranckFreiburger/9af693b0432d7ee85d4e360e524551dc to your computer and use it in GitHub Desktop.
nodejs readable and writable stream to duplex stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Duplex } from 'stream' | |
export default class Duplexify extends Duplex { | |
constructor(writable, readable) { | |
super({ | |
readableObjectMode: readable.readableObjectMode, | |
writableObjectMode: writable.writableObjectMode, | |
readableHighWaterMark: readable.readableHighWaterMark, | |
writableHighWaterMark: writable.writableHighWaterMark, | |
}); | |
this._writable = writable; | |
this._readable = readable; | |
// doc: The readable.read() method should only be called on Readable streams operating in paused mode. | |
readable.pause(); | |
this.once('finish', () => writable.end()); | |
// handle end of stream | |
writable.on('finish', () => void this.end()); | |
readable.on('end', () => void this.push(null)); | |
// forward errors (unlike pipe()) | |
writable.on('error', err => void this.emit('error', err)); | |
readable.on('error', err => void this.emit('error', err)); | |
} | |
_destroy(err) { | |
this._writable.destroy(err); | |
this._readable.destroy(err); | |
} | |
// stream_writable src: https://github.com/nodejs/node/blob/018c3e8949e925efc8077801d44c2b2feb974750/lib/_stream_writable.js#L269 | |
// src doc: Implement an async ._write(chunk, encoding, cb), and it'll handle all the drain event emission and buffering. | |
_write(chunk, encoding, callback) { | |
const ok = this._writable.write(chunk, encoding, () => ok && callback()); | |
if ( !ok ) | |
this._writable.once('drain', callback); | |
} | |
// stream_readable src: https://github.com/nodejs/node/blob/master/lib/_stream_readable.js#L378 | |
_read(size) { | |
const chunk = this._readable.read(size); | |
if ( chunk !== null ) | |
this.push(chunk); | |
else | |
this._readable.once('readable', () => void this._read()); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
see nodejs/help#2460