Last active
September 25, 2024 04:57
-
-
Save sayanriju/25426f561e194f55e50a930516db1dfe to your computer and use it in GitHub Desktop.
Multigraph Pipeline implementation using events
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const EventEmitter = require('events'); | |
/** | |
* Pipeline class to manage a sequence of functions using an event-driven approach. | |
*/ | |
class Pipeline { | |
name; // Name of the pipeline | |
#initialData = {}; // Initial data | |
#connRunSeq = []; // Connection run sequence | |
#ee; // EventEmitter instance | |
/** | |
* Constructor to initialize the EventEmitter instance. | |
* @param {Object} args - Optional arguments for initialization. | |
*/ | |
constructor(name) { | |
this.name = name; | |
this.#ee = new EventEmitter(); | |
} | |
/** | |
* Adds a pair of input and output functions to the pipeline. | |
* @param {Function} inputFn - The input function. | |
* @param {Function} outputFn - The output function. | |
* @returns {Pipeline} - Returns the Pipeline instance for chaining. | |
* @throws {Error} - Throws an error if either inputFn or outputFn is not a function. | |
*/ | |
pipe(inputFn, outputFn) { | |
if (typeof inputFn !== "function" || typeof outputFn !== "function") { | |
throw new Error('Both input and output functions are required'); | |
} | |
// Add to connection run sequence | |
this.#connRunSeq.push({ inputFn, outputFn }); | |
// Setup event listener for inputFn | |
this.#ee.once(inputFn.name, async (payload) => { | |
const result = await outputFn({ ...payload, initialData: this.#initialData }); | |
this.#ee.emit(outputFn.name, result); // Emit event for next function | |
}); | |
return this; // Allow chaining | |
} | |
/** | |
* Runs the pipeline starting from the first function. | |
* @param {Function} [firstFn] - The first function to run. | |
* @param {Object} [initialData={}] - The initial data to pass to the first function. | |
* @param {Function} [lastFn] - The last function to listen for completion. | |
* @returns {Promise} - A promise that resolves with the result of the last function. | |
* @throws {Error} - Throws an error if no connections are specified. | |
*/ | |
async run(firstFn, initialData = {}, lastFn) { | |
if (this.#connRunSeq.length === 0) { | |
throw new Error('No connections specified'); // connRunSeq should never be empty if connect is called at least once | |
} | |
firstFn = firstFn ?? this.#connRunSeq[0]?.inputFn; | |
lastFn = lastFn ?? this.#connRunSeq[this.#connRunSeq.length - 1]?.outputFn; | |
this.#initialData = initialData; | |
// Trigger first function | |
const result = await firstFn(initialData); | |
this.#ee.emit(firstFn.name, result); // Emit event for next function | |
return new Promise((resolve) => { | |
// Setup event listener for lastFn | |
this.#ee.once(lastFn.name, (payload) => { | |
// cleanup the instance: | |
this.#initialData = {}; | |
this.#connRunSeq = []; | |
this.#ee.removeAllListeners(); | |
// return final result: | |
return resolve(payload); | |
}); | |
}); | |
} | |
} | |
module.exports = Pipeline; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment