Skip to content

Instantly share code, notes, and snippets.

@ever-dev
Last active September 7, 2021 14:55
Show Gist options
  • Save ever-dev/bb7d0cf82954e488029a92403cedc94b to your computer and use it in GitHub Desktop.
Save ever-dev/bb7d0cf82954e488029a92403cedc94b to your computer and use it in GitHub Desktop.
Run processes in parallel or series by limiting the number of concurrently running processes.

Description

This service aims to run processes in parallel or in series by limiting the number of concurrently running processes. You may want to run processes in parallel or in series, and couple them to build an awesome workflow!

For example, image

graph LR
A[Start] --> B((Process1))
A --> C((Process2))
A --> D((Process3))
B --> B1((Process1-1))
B --> B2((Process1-2))
B1 --> E((Process4))
B2 --> E

E --> F((Process5))
C --> F
D --> F

F --> G[End]
Loading

Of course, you can build this workflow with Promises, but the problem I want to address here is to limit the maximum number of processes that are running concurrently.

How to use

Create a service with the following parameters:

  • processList: The array of processes, each process can be either a process or a async function. The root is in parallel and switched between parallel and series as depth increases.
  • maxProcessCount: the number of maximum number of concurrent processes.

Example.

For the above workflow, run the following code

const process = new ProcessExecuteService(
  2,
  [
    // parallel
    [
      // series
      [
        // parallel
        [
          // series
          'process1',
          [
            // parallel
            'process1-1',
            'process1-2',
          ],
          'process4',
        ],
        'process2',
        'process3',
      ],
      'process5',
    ],
  ],
  () => console.log('finished'),
);
process.run();
import ProcessExecuteService from './processExecuteService.js';
const process = new ProcessExecuteService(
2,
[
// parallel
[
// series
[
// parallel
[
// series
'process1',
[
// parallel
'process1-1',
'process1-2',
],
'process4',
],
'process2',
'process3',
],
'process5',
],
]
);
process.run();
export default class ProcessExecuteService {
/*
Properties
*/
_maximumProcessCount = 0; // Maximum number of concurrent processes
_processQueue = []; // Array to keep process IDs to run
_runningProcessCount = 0; // Number of processes that are currently running
_processTree = {}; // Process tree
_nextProcessId = 0; // Sequence ID to be assigned to processes
/**
* Constructor
* @param {Array} processList Array of processes to execute
* @param {Number} maxProcessCount Maximum number of concurrent processes
*/
constructor(processList, maxProcessCount = Number.MAX_SAFE_INTEGER) {
this._maximumProcessCount = maxProcessCount;
this._processTree = {
0: {
id: 0,
isFinished: false,
masterId: null,
parentProcessId: null,
},
};
this._nextProcessId = 1;
this._constructTree(0, processList, true);
}
/**
* Construct tree from list of processes
* @param {Number} parentProcessId ID of process group
* @param {Number} processes Array of processes
* @param {Boolean} isParallel True to run processes in parallel, False in series
*/
_constructTree(parentProcessId, processes, isParallel) {
processes.reduce((prevMaster, p) => {
const newProcessId = this._nextProcessId++;
this._processTree = {
...this._processTree,
[newProcessId]: {
id: newProcessId,
parentProcessId: parentProcessId,
masterId: prevMaster,
isFinished: false,
},
};
if (Array.isArray(p)) {
this._constructTree(newProcessId, p, !isParallel);
} else {
this._processTree[newProcessId].process = p;
}
// If it's parallel process, set null as master, otherwise set prev process as its master
return isParallel ? null : newProcessId;
}, null);
}
/**
* Add processes to queue, which are dependent on the given process
* @param {Number} processId Process ID
*/
_triggerSlaveProcess(processId) {
this._processQueue.unshift(
...Object.values(this._processTree)
.filter(({ masterId }) => masterId === processId)
.map(({ id }) => id),
);
}
/**
* Add processes to queue, which are children of the given process
* @param {Number} processId Process ID
*/
_triggerChildProcess(processId) {
this._processQueue.unshift(
...Object.values(this._processTree)
.filter(
({ parentProcessId, masterId }) =>
parentProcessId === processId && masterId === null,
)
.map(({ id }) => id),
);
}
/**
* Check if parent process is finished by looking at all sibling processes
* @param {Number} parentProcessId Parent Process Id
*/
_checkParentProcess(parentProcessId) {
// Check if it's parent is finished
const siblings = Object.values(this._processTree).filter(
(p) => p.parentProcessId === parentProcessId,
);
// If all sibling processes are finished, mark parent process as done
if (siblings.every(({ isFinished }) => isFinished === true)) {
this._handleProcessFinish(parentProcessId);
}
}
/**
* Post process handler
* @param {Number} processId Finished Process Id
*/
_handleProcessFinish(processId) {
// Mark this process as done
this._processTree[processId].isFinished = true;
// Trigger process that are dependent on this process
this._triggerSlaveProcess(processId);
// Check if all processes with same parentProcessId are finished
if (this._processTree[processId].parentProcessId) {
this._checkParentProcess(this._processTree[processId].parentProcessId);
}
}
/**
* Take the processes from process queue
*/
_runProcessFromQueue() {
if (this._runningProcessCount >= this._maximumProcessCount) return;
if (this._processQueue.length == 0) return;
while (
this._runningProcessCount < this._maximumProcessCount &&
this._processQueue.length > 0
) {
const newProcess = this._processTree[this._processQueue.shift()];
if (newProcess.process) {
this._runningProcessCount++;
// newProcess.process().then(() => {
// this._runningProcessCount--;
// this._handleProcessFinish(newProcess.id);
// });
new Promise((res) => {
setTimeout(() => {
console.log(newProcess.process);
res();
}, 1000);
}).then(() => {
this._runningProcessCount--;
this._handleProcessFinish(newProcess.id);
this._runProcessFromQueue();
});
} else {
// If it has children process, put them in the queue
this._triggerChildProcess(newProcess.id);
}
}
}
/**
* Run all processes
*/
run() {
this._runningProcessCount = 0;
this._processQueue = [0];
this._runProcessFromQueue();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment