Created
November 13, 2019 11:02
-
-
Save jackyef/494c775656c4b514723c0c0d54f1fed1 to your computer and use it in GitHub Desktop.
Simple TaskManager implementation for NodeJS
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const EventEmitter = require('events').EventEmitter; | |
class Task extends EventEmitter { | |
constructor(cb, name = 'N/A', type = '') { | |
super(); | |
this.cb = cb; | |
this.name = name; | |
this.type = type; // will be used to determine which queue this task should be enqueued to | |
this.next = null; // can assign another task to .next for sequencing tasks | |
this.started = false; | |
// in case we want to await the task, we just await task.promise | |
this.promise = new Promise((resolve, reject) => { | |
this.resolve = resolve; | |
this.reject = reject; | |
}); | |
this.toString = () => { | |
return `[Task: ${this.cb.name || name}]`; | |
} | |
} | |
async start() { | |
if (!this.started) { | |
try { | |
const result = this.cb(); | |
if (result instanceof Promise) { | |
this.resolve(await result); | |
} else { | |
this.resolve(result); | |
} | |
} catch (err) { | |
this.reject(err); | |
} | |
} | |
} | |
} | |
class TaskManager { | |
constructor() { | |
this.queues = { | |
'build': { | |
availableWorkers: ['0.0.0.1'], // array of workers for this queue type | |
}, | |
'test': { | |
availableWorkers: ['1.2.3.4'], | |
}, | |
}; | |
this.backlogTasks = []; | |
this.activeTasks = new Map(); | |
this.busyWorkers = new Map(); | |
} | |
scheduleTask(task) { | |
this.backlogTasks.push(task); | |
} | |
start() { | |
const debug = () => { | |
console.log('[TaskManager workloop]'); | |
console.log('Tasks in backlog: ', this.backlogTasks.map(t => t.name).join(', ')); | |
const at = []; | |
this.activeTasks.forEach((v, k, m) => { | |
at.push(k.name); | |
}); | |
console.log('Task in progress: ', at.join(', ')); | |
if (this.busyWorkers.size > 0) { | |
console.log('Busy workers:'); | |
this.busyWorkers.forEach((v, k, m) => { | |
console.log('-', `Worker [${k}] is working on ${v.task.name}`); | |
}); | |
} else { | |
console.log('All workers are available!'); | |
} | |
console.log('==================================='); | |
} | |
const performTask = (task, worker) => { | |
this.activeTasks.set(task, task.name); | |
console.log('started task ' + task); | |
task.start(); | |
return task.promise.then(result => { | |
console.log(`Task "${task.name}" finished, result:`, result); | |
this.activeTasks.delete(task); | |
if (worker) { | |
// we need to mark the worker as not busy again | |
const availableWorkers = this.queues[task.type].availableWorkers; | |
const busyWorkers = this.busyWorkers; | |
availableWorkers.push(worker); | |
busyWorkers.delete(worker); | |
} | |
// if there is a next task chained to this task, start it immediately | |
if (task.next) { | |
// return here so it can be tail-call optimised | |
// just schedule the next task and let the TaskManager decide when to do it | |
return this.scheduleTask(task.next); | |
} | |
}).catch(err => { | |
console.log(`Task "${task.name}" errored, err:`, err); | |
}) | |
} | |
const performWork = () => { | |
console.log('[TaskManager] Checking if there are works in backlog...'); | |
if (this.backlogTasks.length > 0) { | |
const totalTasksInBacklog = this.backlogTasks.length; | |
// add all tasks in backlog to activeTasks | |
let count = 0; | |
while (this.backlogTasks[0]) { | |
const currentTask = this.backlogTasks[0]; | |
// try to start performing the task | |
if (!currentTask.type) { | |
// no type, this task can be performed immediately | |
performTask(currentTask); | |
// shift the task out of backlog | |
this.backlogTasks.shift(); | |
} else { | |
// this task needs specific queue, let's see if the queue is available | |
const availableWorkers = this.queues[currentTask.type].availableWorkers; | |
const busyWorkers = this.busyWorkers; | |
if (availableWorkers.length > 0) { | |
const usedWorker = availableWorkers.shift(); // take one worker out | |
// add it to the map so we know it's busy | |
busyWorkers.set(usedWorker, { | |
task: currentTask, | |
}); | |
performTask(currentTask, usedWorker); | |
// shift the task out of backlog | |
this.backlogTasks.shift(); | |
} else { | |
console.log(`[TaskManager] No worker available for task.type: ${currentTask.type}; Will check again in next loop.`); | |
} | |
} | |
// increment counter | |
count += 1; | |
if (count >= totalTasksInBacklog) { | |
// we have iterated over all the backlog tasks | |
// some of the tasks maybe can't be performed (waiting for queue) | |
// so let's just break out of loop here, and see if at the next | |
// performWork() call, it can be performed | |
break; | |
} | |
} | |
} else { | |
console.log('[TaskManager] No tasks in backlog!'); | |
} | |
// schedule next work | |
this.timeout = setTimeout(performWork, 1000); | |
// for debugging | |
debug(); | |
} | |
this.timeout = setTimeout(performWork, 1000); | |
} | |
shutdown() { | |
console.log('[TaskManager] Trying to shutdown...'); | |
let timeout; | |
const tryToStopPerformingWork = (onSuccess, onError) => { | |
if (this.activeTasks.size < 1) { | |
// no active tasks, we can shutdown immediately | |
this.timeout = clearTimeout(this.timeout); | |
onSuccess(); | |
} else { | |
console.log('Failed to shutdown. There are still', this.activeTasks.size, 'tasks running.'); | |
const at = []; | |
this.activeTasks.forEach((v, k, m) => { | |
at.push(k.name); | |
}); | |
console.log('Tasks:', at.join(', ')); | |
onError(); | |
} | |
} | |
return new Promise((resolve, reject) => { | |
const onSuccess = () => { | |
console.log('[TaskManager] Shutdown successful!'); | |
resolve('[TaskManager] Shutdown successful!'); | |
}; | |
const onError = () => { | |
return setTimeout(() => { | |
tryToStopPerformingWork(onSuccess, onError); | |
}, 1000); | |
} | |
tryToStopPerformingWork(onSuccess, onError); | |
}); | |
} | |
} | |
// this should be a singleton | |
const tm = new TaskManager(); | |
const TaskA = new Task(() => { | |
console.log('[TaskA] Just doing some synchronous operations.'); | |
return 'something'; | |
}, 'TaskA'); | |
const TaskB = new Task(() => { | |
console.log('[TaskB] Starting an asynchronous operation...') | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve('[TaskB] Resolved after 3000ms'); | |
}, 3000) | |
}); | |
}, 'TaskB'); | |
const TaskC = new Task(() => { | |
console.log('[TaskC] Should only be done after TaskB finishes'); | |
return '[TaskC] Finished!'; | |
}, 'TaskC'); | |
TaskB.next = TaskC; | |
tm.scheduleTask(TaskA); | |
tm.scheduleTask(TaskB); | |
tm.start(); | |
// we will shutdown here, but TM will wait until all tasks are finished | |
// It also ensures chained tasks are finished | |
setTimeout(() => { | |
tm.shutdown(); | |
}, 1100); | |
const BuildTaskA = new Task(() => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve('[BuildTaskA] Resolved after 10000ms'); | |
}, 10000) | |
}); | |
}, 'BuildTaskA', 'build'); | |
const DeployTaskA = new Task(() => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve('[DeployTaskA] Resolved after 3000ms'); | |
}, 3000) | |
}); | |
}, 'DeployTaskA'); | |
const PostDeployTaskA = new Task(() => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve('[PostDeployTaskA] Resolved after 3000ms'); | |
}, 3000) | |
}); | |
}, 'PostDeployTaskA'); | |
// simulating Build-Deploy-PostDeploy | |
BuildTaskA.next = DeployTaskA; | |
DeployTaskA.next = PostDeployTaskA; | |
const BuildTaskB = new Task(() => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve('[BuildTaskB] Resolved after 10000ms'); | |
}, 10000) | |
}); | |
}, 'BuildTaskB', 'build'); | |
const DeployTaskB = new Task(() => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve('[DeployTaskB] Resolved after 3000ms'); | |
}, 3000) | |
}); | |
}, 'DeployTaskB'); | |
const PostDeployTaskB = new Task(() => { | |
return new Promise((resolve) => { | |
setTimeout(() => { | |
resolve('[PostDeployTaskB] Resolved after 3000ms'); | |
}, 3000) | |
}); | |
}, 'PostDeployTaskB'); | |
// simulating chained tasks | |
BuildTaskB.next = DeployTaskB; | |
DeployTaskB.next = PostDeployTaskB; | |
tm.scheduleTask(BuildTaskA); | |
tm.scheduleTask(BuildTaskB); | |
// users can await the Task if they want | |
(async () => { | |
console.log('Waiting for BuildTaskA...'); | |
await BuildTaskA.promise; | |
console.log('BuildTaskA finished!'); | |
})(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment