Skip to content

Instantly share code, notes, and snippets.

@dmsnell
Created August 17, 2020 16:24
Show Gist options
  • Save dmsnell/96145c5d98dc1985ac66962b87c4acc8 to your computer and use it in GitHub Desktop.
Save dmsnell/96145c5d98dc1985ac66962b87c4acc8 to your computer and use it in GitHub Desktop.
JS worker processing sync and async inbound messages sequentially, inspired by Erlang gen_server
import { MessageChannel, MessagePort } from 'worker_threads';
type ServerDescription<K extends string> = {
[type in K]: (...args: unknown[]) => unknown;
}
type ServerHandler<M extends ServerDescription<keyof M>, S> = <T extends keyof M>(msg: T, state: S) => [ReturnType<M[T]>, S] | Promise<[ReturnType<M[T]>, S]>;
class GenericServer<M extends ServerDescription<keyof M>, S = undefined> {
count = 0;
inbound: MessagePort;
outbound: MessagePort;
queue: ([keyof M] | [keyof M, number])[] = [];
state: S;
working = false;
constructor(
readonly handler: ServerHandler<M, S>,
initialState: S
) {
const { port1, port2 } = new MessageChannel();
this.handler = handler;
this.inbound = port1;
this.outbound = port2;
this.state = initialState;
port2.addListener( 'message', message => {
this.queue.push( message );
this.loop();
} );
port1.unref();
port2.unref();
}
loop() {
if ( this.working ) {
return;
}
const queuedItem = this.queue.shift();
if ( ! queuedItem ) {
return;
}
this.working = true;
const [ message, id ] = queuedItem;
Promise.resolve( this.handler( message, this.state ) ).then( ( [ response, nextState ] ) => {
this.state = nextState;
this.working = false;
if ( 'undefined' !== typeof id ) {
this.outbound.postMessage( [ id, response ] );
}
if ( this.queue.length > 0 ) {
this.loop();
}
} );
}
call<Action extends keyof M>(message: Action): Promise<M[Action]> {
const id = this.count++;
return new Promise( resolve => {
this.inbound.postMessage( [ message, id ] );
const listener = ( [ nextId, response ]: [ number, M[Action] ] ) => {
if ( nextId !== id ) {
return;
}
this.inbound.removeListener( 'message', listener );
resolve( response );
}
this.inbound.addListener( 'message', listener );
} );
}
cast<Action extends keyof M>(message: Action): void {
this.inbound.postMessage( [ message ] );
}
}
const pingServer = new GenericServer<{ping: () => string}>(
(msg, state) => new Promise( resolve => setTimeout( () => resolve( [ 'pong', state ] ), 1000 ) ),
undefined
);
pingServer.cast( 'ping')
pingServer.cast( 'ping' );
pingServer.cast( 'ping' );
pingServer.call( 'ping' ).then( data => {
console.log( 'resolved with:', data );
} );
type Counter = {
down: () => number;
up: () => number;
reset: () => boolean;
}
const counterHandler: ServerHandler<Counter, number> = (msg, state) => {
switch ( msg ) {
case "up":
return [state + 1, state + 1];
case "down":
return [state - 1, state - 1];
case "reset":
return [true, 0];
}
};
const counter = new GenericServer<Counter, number>(counterHandler, 0);
(async () => {
counter.cast( 'up' );
counter.cast( 'up' );
counter.cast( 'up' );
counter.cast( 'up' );
counter.cast( 'up' );
counter.cast( 'up' );
console.log('before');
const count = counter.call( 'down' );
console.log('after');
console.log( await counter.call( 'up' ) );
console.log( await counter.call( 'up' ) );
console.log( await counter.call( 'up' ) );
console.log( await counter.call( 'up' ) );
console.log( await counter.call( 'up' ) );
console.log( await counter.call( 'down' ) );
counter.cast( 'down' );
counter.cast( 'down' );
counter.cast( 'down' );
const reset = counter.call( 'reset' );
counter.cast( 'down' );
console.log( await counter.call( 'down' ) );
console.log( await counter.call( 'down' ) );
console.log( await counter.call( 'up' ) );
console.log( await counter.call( 'up' ) );
console.log( await counter.call( 'up' ) );
console.log( await counter.call( 'up' ) );
console.log( await counter.call( 'down' ) );
console.log( await counter.call( 'up' ) );
})()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment