Skip to content

Instantly share code, notes, and snippets.

@DmitrySoshnikov
Created October 15, 2015 19:58
Show Gist options
  • Save DmitrySoshnikov/61430cadadcc37be7429 to your computer and use it in GitHub Desktop.
Save DmitrySoshnikov/61430cadadcc37be7429 to your computer and use it in GitHub Desktop.
Educational cooperative processes scheduler
/**
* Cooperative tasks scheduler.
*
* by Dmitry Soshnikov <[email protected]>
* MIT Style License
*
* An educational implementation to show the work of a processes scheduler.
* In practice you may find using a library like Task.js, or similar.
*
* Test, Node 4 or transformed with Babel:
*
* node ~/scheduler.js
*
* Usage example:
*
* let data = [];
*
* let p1 = spawn(function* () {
* yield data.push('p1:1');
* yield data.push('p1:2');
* });
*
* let p2 = spawn(function* () {
* yield data.push('p2:1');
* yield data.push('p2:2');
* });
*
* Check cooperative threading:
*
* setTimeout(() => console.log(data), 500);
*
* > ['p1:1', 'p2:1', 'p1:2', 'p2:2']
*
* Implementation details:
*
* This is ES6 version with delegating generators, which
* dramatically simplifies the handling of the nested generator
* results. For comparison, see how we did it previously back
* to 4 years ago with SpiderMonkey's generators implementation:
* https://gist.github.com/DmitrySoshnikov/1130172
* There we had to maintain manual call-stack for generators, the
* technique called "trampolining".
*
* Further improvements:
*
* Q: How to turn them into similar to Erlang's processes?
* I.e. avoid explicit yield'ing.
*
* A: Transform the code by inserting `yield` after each
* "reduction" part, e.g. after each statement.
*/
'use strict';
class Process {
constructor(handler) {
this._handler = handler();
this._value = undefined;
this.pid = ++Process.pid;
}
run() {
var result = this._handler.next(this._value);
this._value = result.value;
return result;
}
}
Process.pid = 0;
/**
* Processes scheduler. Maintains processes run queue, and executes them
* in cooperative mode. This educational version implements simple
* "round robin" algorithm for the scheduling. In production you may
* want to consider implementing process priorities, etc.
*/
var Scheduler = (function initScheduler() {
// Init the handle loop.
setTimeout(() => Scheduler.handleLoop(), 0);
var
/**
* run queue; processes
* ready to be executed
*/
runQueue = [],
/**
* processes map -- all
* currently alive processes
*/
processes = {},
/**
* to track count of
* alive processes
*/
processesCount = 0,
/**
* Autoinc process id, which is
* assigned to newborn process
*/
pid = 1;
return {
/**
* Currently running process,
* used in self() function
*/
runningProcess: null,
/**
* Sheduling is just putting a
* process to the run queue; it will be
* executed on next iteration of the handle loop
*/
schedule(process) {
runQueue.unshift(process);
},
/**
* Creates new process and
* automatically schedules it
*/
spawn(processLoop) {
var process = new Process(processLoop);
processesCount++;
console.log(`* Spawning new process with pid #${process.pid}`);
this.schedule(process);
return process.pid;
},
/**
* Terminates a process by pid
* or the process itself parameter
*/
terminate(process) {
var pid = typeof process === 'number' ? process : process.pid;
console.log(`* Process #${pid} is terminated`);
delete processes[pid];
processesCount--;
return pid;
},
/**
* Main scheduling loop: simply retrieves a current process from
* the run queue, execute it, and schedule back in round robin mode.
*/
handleLoop() {
if (processesCount <= 0) {
console.log('Idle: no processes scheduled.');
setTimeout(() => this.handleLoop(), 500);
return;
}
var process = runQueue.pop();
if (process) {
Scheduler.runningProcess = process;
try {
var result = process.run();
if (!result.done) {
this.schedule(process);
} else {
this.terminate(process);
}
} catch (e) {
console.log(
`* Process ${process.pid} threw an exception, terminating...`
);
this.terminate(process);
}
}
setTimeout(() => this.handleLoop(), 0);
},
};
})();
// creates new process
function spawn(process) {
return Scheduler.spawn(process);
}
// helper function to get
// the pid of currently running process
function self() {
return Scheduler.runningProcess.pid;
}
// two helper functions to
// test trampolining technique
function* calculate(value) {
return (yield* nested(value)) + 10
}
function* nested(data) {
return data + 10;
}
// Every 3 seconds spawn 3 new processes.
setInterval(() => {
for (var k = 3; k--;) {
spawn(function* () {
console.log(`Process #${self()} first entry`);
yield; // "cooperative context switch"
var x = yield* calculate(self());
console.log(`Process #${self()} second entry: ${x}` );
});
}
}, 3000);
// Results:
// * Spawning new process with pid #1
// * Spawning new process with pid #2
// * Spawning new process with pid #3
// Process #1 first entry
// Process #2 first entry
// Process #3 first entry
// Process #1 second entry: 21
// * Process #1 is terminated
// Process #2 second entry: 22
// * Process #2 is terminated
// Process #3 second entry: 23
// * Process #3 is terminated
// Idle: no processes scheduled.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment