Skip to content

Instantly share code, notes, and snippets.

@sayanriju
Last active September 25, 2024 04:57
Show Gist options
  • Save sayanriju/25426f561e194f55e50a930516db1dfe to your computer and use it in GitHub Desktop.
Save sayanriju/25426f561e194f55e50a930516db1dfe to your computer and use it in GitHub Desktop.
Multigraph Pipeline implementation using events
const EventEmitter = require('events');
/**
* Pipeline class to manage a sequence of functions using an event-driven approach.
*/
class Pipeline {
name; // Name of the pipeline
#initialData = {}; // Initial data
#connRunSeq = []; // Connection run sequence
#ee; // EventEmitter instance
/**
* Constructor to initialize the EventEmitter instance.
* @param {Object} args - Optional arguments for initialization.
*/
constructor(name) {
this.name = name;
this.#ee = new EventEmitter();
}
/**
* Adds a pair of input and output functions to the pipeline.
* @param {Function} inputFn - The input function.
* @param {Function} outputFn - The output function.
* @returns {Pipeline} - Returns the Pipeline instance for chaining.
* @throws {Error} - Throws an error if either inputFn or outputFn is not a function.
*/
pipe(inputFn, outputFn) {
if (typeof inputFn !== "function" || typeof outputFn !== "function") {
throw new Error('Both input and output functions are required');
}
// Add to connection run sequence
this.#connRunSeq.push({ inputFn, outputFn });
// Setup event listener for inputFn
this.#ee.once(inputFn.name, async (payload) => {
const result = await outputFn({ ...payload, initialData: this.#initialData });
this.#ee.emit(outputFn.name, result); // Emit event for next function
});
return this; // Allow chaining
}
/**
* Runs the pipeline starting from the first function.
* @param {Function} [firstFn] - The first function to run.
* @param {Object} [initialData={}] - The initial data to pass to the first function.
* @param {Function} [lastFn] - The last function to listen for completion.
* @returns {Promise} - A promise that resolves with the result of the last function.
* @throws {Error} - Throws an error if no connections are specified.
*/
async run(firstFn, initialData = {}, lastFn) {
if (this.#connRunSeq.length === 0) {
throw new Error('No connections specified'); // connRunSeq should never be empty if connect is called at least once
}
firstFn = firstFn ?? this.#connRunSeq[0]?.inputFn;
lastFn = lastFn ?? this.#connRunSeq[this.#connRunSeq.length - 1]?.outputFn;
this.#initialData = initialData;
// Trigger first function
const result = await firstFn(initialData);
this.#ee.emit(firstFn.name, result); // Emit event for next function
return new Promise((resolve) => {
// Setup event listener for lastFn
this.#ee.once(lastFn.name, (payload) => {
// cleanup the instance:
this.#initialData = {};
this.#connRunSeq = [];
this.#ee.removeAllListeners();
// return final result:
return resolve(payload);
});
});
}
}
module.exports = Pipeline;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment