Last active
January 2, 2022 14:48
-
-
Save mfbx9da4/9065154558bb2bbfc46864d7bc612391 to your computer and use it in GitHub Desktop.
Remote procedure calls (RPCs) using BroadcastChannel in deno
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ---- START IMPORTS ---- | |
export type AssertionExtra = (Record<string, unknown> & { name?: ErrorCode }) | ErrorCode | |
export function assert(predicate: any, message: string, extra: AssertionExtra = {}): asserts predicate { | |
if (!predicate) { | |
extra = typeof extra === 'string' ? { name: extra } : extra | |
if (!('name' in extra)) { | |
extra.name = ErrorCode.AssertionError | |
} | |
throw new AssertionError(message, extra) | |
} | |
} | |
export class AssertionError extends Error { | |
constructor(message: string, extra: any = {}) { | |
super(message) | |
this.name = 'AssertionError' | |
Object.assign(this, extra) | |
} | |
} | |
export class DeferredPromise<T = void, E = any> { | |
isPending = true | |
isFulfilled = false | |
isRejected = false | |
reject: (x: E) => void = () => {} | |
resolve: (x: T | PromiseLike<T>) => void = () => {} | |
promise: Promise<T> | |
constructor() { | |
this.promise = new Promise<T>((res, rej) => { | |
this.resolve = (...args) => { | |
this.isPending = false | |
this.isFulfilled = true | |
res(...args) | |
} | |
this.reject = (...args) => { | |
this.isPending = false | |
this.isRejected = true | |
rej(...args) | |
} | |
}) | |
} | |
} | |
// ---- END IMPORTS ---- | |
const uuid = () => crypto.randomUUID() | |
export type MethodNames<T> = { [K in keyof T]: T[K] extends Function ? K : never }[keyof T] | |
export type PickMethods<T> = Pick<T, MethodNames<T>> | |
type PromisifyMethods<T> = { [K in keyof T]: Promisify<T[K]> } | |
type Promisify<T> = T extends (...args: infer Args) => infer Ret | |
? Ret extends Promise<unknown> | |
? (...args: Args) => Ret | |
: (...args: Args) => Promise<Ret> | |
: never | |
type Payload<T> = { requestId: string; fn: keyof T; args: unknown[] } | |
export class BroadcastMethods<T extends Record<string, Function>> { | |
timeout = 5000 | |
constructor(opts?: { timeout?: number }) { | |
if (opts?.timeout) { | |
this.timeout = opts.timeout | |
} | |
} | |
expose(channelId: string, methods: T) { | |
assert(channelId, 'missing channelId') | |
const chan = new BroadcastChannel(channelId) | |
chan.onmessage = async e => { | |
assert(e.data, `missing data for "${channelId}"`) | |
const data = e.data as Payload<T> | |
const requestId = data.requestId | |
const fn = data.fn | |
const args = data.args | |
assert(requestId && typeof requestId === 'string', `missing requestId for "${channelId}"`) | |
assert(typeof fn === 'string', `missing fn for "${channelId}"`) | |
assert(Array.isArray(args), `missing args for "${channelId}"`) | |
assert(methods[fn], `missing method "${fn}" for "${channelId}"`) | |
try { | |
const result = await methods[fn](...args) | |
chan.postMessage({ result, requestId }) | |
} catch (error) { | |
chan.postMessage({ error, requestId }) | |
} | |
} | |
return chan | |
} | |
get(channelId: string): PromisifyMethods<T> { | |
const exec = (fn: keyof PromisifyMethods<T>, args: unknown[]) => { | |
assert(channelId, 'missing channelId') | |
const requestId = uuid() | |
const chan = new BroadcastChannel(channelId) | |
const promise = new DeferredPromise<T[typeof fn]>() | |
chan.onmessage = e => { | |
if (!promise.isPending) return chan.close() | |
if (requestId === e.data?.requestId) { | |
chan.close() | |
if ('error' in e.data) { | |
promise.reject(e.data.error) | |
} else { | |
promise.resolve(e.data.result) | |
} | |
} | |
} | |
setTimeout(() => { | |
if (promise.isPending) { | |
chan.close() | |
promise.reject(new Error(`Timeout for "${channelId}" method: "${fn}"`)) | |
} | |
}, this.timeout) | |
const payload: Payload<T> = { requestId, fn, args } | |
chan.postMessage(payload) | |
return promise.promise | |
} | |
return new Proxy( | |
{}, | |
{ | |
get: (_, fn) => { | |
return (...args: unknown[]) => exec(fn as keyof PromisifyMethods<T>, args) | |
}, | |
}, | |
) as PromisifyMethods<T> | |
} | |
} | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export async function usage() { | |
// Set up some in memory state. | |
// We want to make this state globally available to all deno instances. | |
let count = 0 | |
const counterMethods = { | |
increment: () => (count += 1), | |
decrement: () => (count -= 1), | |
currentCount: () => count, | |
} | |
// This ID should be unique across all deno instances. | |
const counterId = 'some-uuid' | |
const globalCounters = new BroadcastMethods<typeof counterMethods>() | |
// We expose the counter methods to all deno instances for this specific `counterId` | |
globalCounters.expose(counterId, counterMethods) | |
// We can now get the current state of the counter from it's origin deno instance. | |
// If the counter is on this deno instance, it will short circuit and return the current state. | |
// If the counter has not yet been initialized or the origin deno instance has died, this will | |
// throw with a timeout error. | |
const counterInstance = globalCounters.get(counterId) | |
console.log('currentCount', await counterInstance.currentCount()) | |
// We can also increment and decrement the counter on it's origin deno instance. | |
await counterInstance.increment() | |
await counterInstance.increment() | |
await counterInstance.decrement() | |
console.log('currentCount2', await counterInstance.currentCount()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment