Created
August 29, 2020 20:31
-
-
Save dmsnell/c0ccf00737c8611f4e8807be601b9d6c to your computer and use it in GitHub Desktop.
50k `async` JS processes
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export const counter = async ({send, receive}) => { | |
let count = 0; | |
while (1) { | |
const [action, args] = await receive(); | |
switch (action) { | |
case 'inc': | |
count++; | |
break; | |
case 'dec': | |
count--; | |
break; | |
case 'add': | |
count += args; | |
break; | |
case 'report': | |
send(args, count); | |
break; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Worker } = require('worker_threads'); | |
const spawnWorker = id => new Promise( resolve => { | |
const worker = new Worker('./worker-loader.js', { workerData: { id } } ); | |
worker.on('message', async ({sender, spawner}) => { | |
let requestCount = 0; | |
const spawn = (path, name, args) => new Promise( resolve => { | |
const requestId = requestCount++; | |
spawner.postMessage([requestId, path, name, args]); | |
const responder = ({messageId, pid}) => { | |
if ( messageId !== requestId ) { | |
return; | |
} | |
spawner.removeListener('message', responder); | |
resolve(pid) | |
}; | |
spawner.addListener('message', responder); | |
} ); | |
const send = (pid, message) => sender.postMessage([pid, message]); | |
const on = f => sender.on('message', f); | |
resolve({send, spawn, on}); | |
}); | |
worker.postMessage(null); | |
} ); | |
const go = async () => { | |
const ap = spawnWorker('a'); | |
const bp = spawnWorker('b'); | |
const a = await ap; | |
const b = await bp; | |
const counter1p = a.spawn('./counter.mjs', 'counter', []); | |
const counter2p = a.spawn('./counter.mjs', 'counter', []); | |
const counter3p = b.spawn('./counter.mjs', 'counter', []); | |
const counter = await counter1p; | |
const counter2 = await counter2p; | |
const counter3 = await counter3p; | |
a.send(counter, ['inc']); | |
b.send(counter3, ['add', 4813]); | |
b.send(counter3, ['report', 33]); | |
a.send(counter2, ['dec']); | |
a.send(counter, ['inc']); | |
a.send(counter, ['report', 15]); | |
a.send(counter, ['add', 238]); | |
a.send(counter2, ['report', 83]); | |
a.send(counter, ['report', 24]); | |
}; | |
// go(); | |
const range = n => new Array(n).fill(null).map((v, i) => i); | |
const go2 = async () => { | |
const COUNT = 50000; | |
const THREADS = 16; | |
const workers = await Promise.all(range(THREADS).map(i => spawnWorker(`worker-${ i }`))); | |
let total = 0; | |
let count = 0; | |
workers.forEach( async ( worker, i ) => { | |
worker.on( ([pid, message]) => { | |
total += message; | |
count++; | |
if (count >= COUNT) { | |
console.log({total}); | |
process.exit(0); | |
} | |
} ); | |
for (let j = 0; j < COUNT / THREADS; j++) { | |
const counter = await worker.spawn('./counter.mjs', 'counter', []); | |
worker.send(counter, ['add', 1 ] ); | |
worker.send(counter, ['report', i ] ); | |
} | |
} ); | |
} | |
go2(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { MessageChannel, MessagePort, Worker } from 'worker_threads'; | |
import type { Pid } from './fidget'; | |
type ReceiveFunction = () => Promise<unknown> | |
type ProcessFunction = (args: { | |
receive: ReceiveFunction; | |
send: (pid: Pid, message: unknown) => void; | |
self: Pid; | |
}) => void; | |
type Process = { | |
port: MessagePort; | |
queue: unknown[] | |
}; | |
const process_table = new Map<Pid, Process>(); | |
let pid_counter = 0; | |
const next_pid = (): Pid => ++pid_counter; | |
export function spawn(runner: ProcessFunction, sender): Pid { | |
const pid = next_pid(); | |
const { port1, port2 } = new MessageChannel(); | |
const process: Process = { | |
port: port1, | |
queue: [] | |
} | |
const receive: ReceiveFunction = function() { | |
const selectors = arguments; | |
const timeout = selectors.length > 0 && Number.isFinite(selectors[selectors.length - 1]) | |
? selectors[selectors.length - 1] | |
: null; | |
return new Promise( resolve => { | |
let listener: () => void; | |
const timer = timeout && setTimeout( () => { | |
port2.removeListener('message', listener); | |
resolve( selectors.length > 1 ? [null, 'timeout'] : 'timeout'); | |
}, timeout ); | |
listener = () => { | |
if (process.queue.length <= 0) { | |
port2.once('message', listener); | |
return; | |
} | |
const message = process.queue[0]; | |
// with no selectors we can return the message itself | |
if (selectors.length === 0 || (timeout !== null && selectors.length === 1 ) ) { | |
process.queue.shift(); | |
clearTimeout(timer); | |
resolve(message); | |
return; | |
} | |
// otherwise we have to check if the message is selected | |
for (let q = 0; q < process.queue.length; q++) { | |
const message = process.queue[q]; | |
for (let i = 0; i < selectors.length && 'function' === typeof selectors[i]; i++) { | |
const selector = selectors[i]; | |
if (selector(message)) { | |
process.queue.splice(q, 1); | |
clearTimeout(timer); | |
resolve([i, message]); | |
return; | |
} | |
} | |
} | |
// if we got here then we had no selector match | |
// so we'll have to wait for the next one | |
port2.once('message', listener); | |
} | |
listener(); | |
} ); | |
} | |
process_table.set( pid, process ); | |
setTimeout( () => { | |
try { | |
runner( { receive, send: sender ?? send, self: pid } ); | |
} catch ( e ) { | |
// pass | |
} | |
}, 0 ); | |
return pid; | |
} | |
export function send(pid: Pid, message: unknown) { | |
const process = process_table.get(pid); | |
if (!process) { | |
return false; | |
} | |
process.queue.push(message); | |
process.port.postMessage(null); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { MessageChannel, parentPort, workerData } = require('worker_threads') | |
const { send: localSend, spawn } = require('./process.js'); | |
const { id } = workerData; | |
parentPort.once('message', () => { | |
const sender = new MessageChannel(); | |
const spawner = new MessageChannel(); | |
const send = (pid, message) => { | |
sender.port1.postMessage([pid, message]); | |
} | |
sender.port1.on( 'message', data => { | |
const [pid, message] = data; | |
localSend(pid, message); | |
} ); | |
spawner.port1.on( 'message', message => { | |
const [ messageId, modulePath, functionName, args ] = message; | |
import( modulePath ).then( async moduleObject => { | |
const pid = spawn( moduleObject[ functionName ], send ); | |
spawner.port1.postMessage({messageId, pid}); | |
} ); | |
} ); | |
parentPort.postMessage({ | |
sender: sender.port2, | |
spawner: spawner.port2 | |
}, [ sender.port2, spawner.port2 ]); | |
} ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment