Skip to content

Instantly share code, notes, and snippets.

@ashleydavis
Last active March 5, 2019 08:06
Show Gist options
  • Save ashleydavis/db9593714e179c9bb378f63651ecaf71 to your computer and use it in GitHub Desktop.
Save ashleydavis/db9593714e179c9bb378f63651ecaf71 to your computer and use it in GitHub Desktop.
A prototype multi-core task processing API
/*
This a prototype / pseudo-code in TypeScript for a new multi-core task scheduler that I'm planning to open source.
This is based on my earlier private code that I've used to munge masses of stock market data and also talked about in my
book Data Wrangling with JavaScript.
To give me private feedback on this please email me on [email protected].
The basic premise of this is that we can split up a complex data processing job into mulitple 'tasks' each of
which can potentially run in parallel on a multi-core machine.
A main aim of this API is simplicity. I wanted to create an API that works well with promises so that it's easy to await
the completion of a complex task graph.
*/
import * as os from 'os';
import { Task, ITask } from './task';
import { IScheduler } from './scheduler';
import { ClusterScheduler } from './cluster-scheduler';
import { assert } from 'console';
const taskA = Task.register( // All tasks must be registered globally so they exist in both the master and worker processes.
"TaskA", // Tasks must be named so they can be identified in the work proceses.
async () => { // This function implements 'Task A'.
console.log("Task A");
return 1; // <-- This could be a complex time consuming calculation. Results must be serializable to JSON.
}
);
const taskB = Task.register(
"TaskB",
async () => {
console.log("Task B");
return 2; // <-- Any of these task functions can be asynchronous and return a promise.
}
);
const taskC = Task.register(
"TaskC",
async (a: number, b: number) => { // Task inputs must be serializable to JSON.
console.log("Task C");
return a + b; // <-- This task is dependent on pre-computed inputs from other tasks.
}
);
//
// Your function to wire together a task with dependencies.
//
function createTask(): ITask<number> {
//
// Your code here decided how to build the task graph.
// Here we are saying that taskA and taskB are inputs to task C.
//
return taskC.create(taskA.create(), taskB.create());
}
//
// Run application code.
//
async function main(scheduler: IScheduler) { // Input the scheduler to our application code.
const task = createTask();
const answer = await task.run(scheduler); // Run the task and await completion. Tasks A and B run in parallel on separate workers.
assert(answer === 3);
}
//
// Bootstrap the cluster.
//
async function bootstrap() {
const numWorkers = os.cpus().length; // Create one worker per CPU.
// Create a Node.js cluster of X worker processes.
// Note that this could also be a different type of sheduler for say working with a cluster in the cloud.
const scheduler = new ClusterScheduler(numWorkers);
scheduler.init(() => main(scheduler)); // Let the scheduler call our main process.
}
bootstrap()
.catch(err => console.error(err && err.stack || err));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment