Last active
August 27, 2020 21:56
-
-
Save dmsnell/a4cebf052a110d49a02719ca2b63006e to your computer and use it in GitHub Desktop.
Process Primitives
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { spawn, send } = require('./process.js') | |
const counter = spawn( async (receive) => { | |
let count = 0; | |
while (1) { | |
const message = await receive(); | |
if (message === 'inc') { | |
console.log(++count); | |
} else if ( message === 'dec' ) { | |
console.log(--count); | |
} else if ( Array.isArray(message) && message[0] === 'add' ) { | |
count += message[1]; | |
console.log(count); | |
} | |
} | |
} ) | |
send(counter, 'inc'); | |
send(counter, 'inc'); | |
send(counter, 'inc'); | |
send(counter, 'dec'); | |
send(counter, ['add', 1248]); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
exports.__esModule = true; | |
exports.send = exports.spawn = void 0; | |
var worker_threads_1 = require("worker_threads"); | |
var process_table = new Map(); | |
var pid_counter = 0; | |
var next_pid = function () { return ++pid_counter; }; | |
exports.spawn = function (runner) { | |
var pid = next_pid(); | |
var _a = new worker_threads_1.MessageChannel(), port1 = _a.port1, port2 = _a.port2; | |
var process = { | |
port: port1, | |
queue: [] | |
}; | |
var receive = function () { | |
var selectors = arguments; | |
var timeout = selectors.length > 0 && Number.isFinite(selectors[selectors.length - 1]) | |
? selectors[selectors.length - 1] | |
: null; | |
return new Promise(function (resolve) { | |
var listener; | |
var timer = timeout && setTimeout(function () { | |
port2.removeListener('message', listener); | |
resolve(selectors.length > 1 ? [null, 'timeout'] : 'timeout'); | |
}, timeout); | |
listener = function () { | |
if (process.queue.length <= 0) { | |
port2.once('message', listener); | |
return; | |
} | |
var message = process.queue[process.queue.length - 1]; | |
// with no selectors we can return the message itself | |
if (selectors.length === 0 || (timeout !== null && selectors.length === 1)) { | |
process.queue.pop(); | |
clearTimeout(timer); | |
resolve(message); | |
return; | |
} | |
// otherwise we have to check if the message is selected | |
for (var q = process.queue.length - 1; q >= 0; q--) { | |
var message_1 = process.queue[q]; | |
for (var i = 0; i < selectors.length && 'function' === typeof selectors[i]; i++) { | |
var selector = selectors[i]; | |
if (selector(message_1)) { | |
process.queue.splice(q, 1); | |
clearTimeout(timer); | |
resolve([i, message_1]); | |
return; | |
} | |
} | |
} | |
// if we got here then we had no selector match | |
// so we'll have to wait for the next one | |
port2.once('message', listener); | |
}; | |
listener(); | |
}); | |
}; | |
process_table.set(pid, process); | |
setTimeout(function () { | |
try { | |
runner(receive); | |
} | |
catch (e) { | |
// pass | |
} | |
}, 0); | |
return pid; | |
}; | |
exports.send = function (pid, message) { | |
var process = process_table.get(pid); | |
if (!process) { | |
return false; | |
} | |
process.queue.push(message); | |
process.port.postMessage(null); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { MessageChannel, MessagePort } from 'worker_threads'; | |
import type { Pid } from './fidget'; | |
type ReceiveFunction = () => Promise<unknown> | |
type ProcessFunction = (receive: ReceiveFunction) => void; | |
type Process = { | |
port: MessagePort; | |
queue: unknown[] | |
}; | |
const process_table = new Map<Pid, Process>(); | |
let pid_counter = 0; | |
const next_pid = (): Pid => ++pid_counter; | |
export const spawn = (runner: ProcessFunction): Pid => { | |
const pid = next_pid(); | |
const { port1, port2 } = new MessageChannel(); | |
const process: Process = { | |
port: port1, | |
queue: [] | |
} | |
const receive: ReceiveFunction = function() { | |
const selectors = arguments; | |
const timeout = selectors.length > 0 && Number.isFinite(selectors[selectors.length - 1]) | |
? selectors[selectors.length - 1] | |
: null; | |
return new Promise( resolve => { | |
let listener: () => void; | |
const timer = timeout && setTimeout( () => { | |
port2.removeListener('message', listener); | |
resolve( selectors.length > 1 ? [null, 'timeout'] : 'timeout'); | |
}, timeout ); | |
listener = () => { | |
if (process.queue.length <= 0) { | |
port2.once('message', listener); | |
return; | |
} | |
const message = process.queue[process.queue.length - 1]; | |
// with no selectors we can return the message itself | |
if (selectors.length === 0 || (timeout !== null && selectors.length === 1 ) ) { | |
process.queue.pop(); | |
clearTimeout(timer); | |
resolve(message); | |
return; | |
} | |
// otherwise we have to check if the message is selected | |
for (let q = process.queue.length - 1; q >= 0; q--) { | |
const message = process.queue[q]; | |
for (let i = 0; i < selectors.length && 'function' === typeof selectors[i]; i++) { | |
const selector = selectors[i]; | |
if (selector(message)) { | |
process.queue.splice(q, 1); | |
clearTimeout(timer); | |
resolve([i, message]); | |
return; | |
} | |
} | |
} | |
// if we got here then we had no selector match | |
// so we'll have to wait for the next one | |
port2.once('message', listener); | |
} | |
listener(); | |
} ); | |
} | |
process_table.set( pid, process ); | |
setTimeout( () => { | |
try { | |
runner( receive ); | |
} catch ( e ) { | |
// pass | |
} | |
}, 0 ); | |
return pid; | |
} | |
export const send = (pid: Pid, message: unknown) => { | |
const process = process_table.get(pid); | |
if (!process) { | |
return false; | |
} | |
process.queue.push(message); | |
process.port.postMessage(null); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment