Last active
March 15, 2024 03:37
-
-
Save SydneyUni-Jim/7168b3190ca49bc9a0f5fd90ee84ad13 to your computer and use it in GitHub Desktop.
Node.js Transform that executes in parallel with an async transformer
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { strict as assert } from 'node:assert' | |
import { Transform } from 'node:stream' | |
import os from 'node:os' | |
export class ParallelAsyncTransform extends Transform { | |
constructor(maxParallel = os.availableParallelism(), options = {}) { | |
super({ | |
highWaterMark: maxParallel, | |
objectMode: true, | |
...options | |
}) | |
this._buffer = [] | |
this._flushed = false | |
this._maxParallel = maxParallel | |
this._onDrainedCallback = undefined | |
this._popCount = 0 | |
this._writeCount = 0 | |
this._signal = options?.signal | |
} | |
async _asyncTransform(chunk, encoding, signal){ | |
throw new Error('_asyncTransform not implemented') | |
} | |
_read(size) { | |
// Nothing to do here. _write drives objects into the readable side of this duplex. | |
} | |
_write(chunk, encoding, callback) { | |
let writeCount = this._writeCount++ | |
this._asyncTransform(chunk, encoding, this._signal) | |
.then(data => { | |
if (this.destroyed) return | |
this._buffer.push(data) | |
this._drain() | |
}) | |
.catch(err => this.destroy(err)) | |
if (this._writeCount - this._popCount < this._maxParallel) { | |
callback() | |
} else { | |
assert(!this._onDrainedCallback, '_onDrainedCallback is still defined') | |
this._onDrainedCallback = callback | |
} | |
} | |
_flush(callback) { | |
this._flushed = true | |
this._onDrainedCallback = callback | |
this._drain() | |
} | |
_drain() { | |
while (this._buffer.length) { | |
const data = this._buffer.pop() | |
this._popCount++ | |
if (data == null) continue | |
if (!this.push(data)) break | |
} | |
if (this._drained && this._onDrainedCallback) { | |
const callback = this._onDrainedCallback | |
this._onDrainedCallback = undefined | |
callback() | |
} | |
} | |
get _drained() { | |
const transformsInFlight = this._writeCount - this._popCount | |
return this._flushed ? !transformsInFlight : transformsInFlight < this._maxParallel | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This is a "modernised" version of https://github.com/mafintosh/parallel-transform.