Created
January 2, 2022 04:19
-
-
Save ccorcos/74a21cc04e36627f719d232b0003a6f2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { strict as assert } from "assert" | |
import EventEmitter from "events" | |
import { describe, it } from "mocha" | |
import { IPCPeer } from "./IPCPeer" | |
function setupPeers< | |
A2B extends AnyFunctionMap = AnyFunctionMap, | |
B2A extends AnyFunctionMap = AnyFunctionMap | |
>() { | |
const aEvents = new EventEmitter() | |
const bEvents = new EventEmitter() | |
const a = new IPCPeer<A2B, B2A>({ | |
send: (message) => { | |
bEvents.emit("event", message) | |
}, | |
listen: (callback) => { | |
aEvents.on("event", callback) | |
return () => aEvents.off("event", callback) | |
}, | |
}) | |
const b = new IPCPeer<B2A, A2B>({ | |
send: (message) => { | |
aEvents.emit("event", message) | |
}, | |
listen: (callback) => { | |
bEvents.on("event", callback) | |
return () => bEvents.off("event", callback) | |
}, | |
}) | |
return { a, b } | |
} | |
describe("IPCPeer", () => { | |
it("works", async () => { | |
const { a, b } = setupPeers() | |
const stopB = b.answerFn("add", (x, y) => x + y) | |
const stopA = a.answerFn("double", (x) => x + x) | |
assert.equal(await a.callFn("add", 10, 2), 12) | |
assert.equal(await b.callFn("double", 10), 20) | |
}) | |
it.skip("Stop listening works", async () => {}) | |
it.skip("Destroy works", async () => {}) | |
it("Works with proxy types", async () => { | |
type A = { | |
add(x: number, y: number): number | |
} | |
type B = { | |
double(x: number): number | |
} | |
const { a, b } = setupPeers<A, B>() | |
const stopB = b.answer.add((x, y) => x + y) | |
const stopA = a.answer.double((x) => x + x) | |
assert.equal(await a.call.add(10, 2), 12) | |
assert.equal(await b.call.double(10), 20) | |
}) | |
it("Subscribes", async () => { | |
type A = { | |
count(n: number, cb: (count: number) => void): () => void | |
} | |
type B = {} | |
const { a, b } = setupPeers<A, B>() | |
const stopB = b.answer.count((initialCount, cb) => { | |
let n = initialCount | |
const timerId = setInterval(() => { | |
n += 1 | |
cb(n) | |
}, 1) | |
return () => { | |
clearInterval(timerId) | |
} | |
}) | |
const results: number[] = [] | |
const unsub = await a.call.count(12, (n) => { | |
results.push(n) | |
}) | |
assert.equal(typeof unsub, "function") | |
await sleep(10) | |
unsub() | |
const len = results.length | |
assert.deepEqual(results.slice(0, 5), [13, 14, 15, 16, 17]) | |
await sleep(10) | |
assert.equal(len, results.length) | |
}) | |
}) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { isFunction } from "lodash" | |
import { deserializeError, serializeError } from "serialize-error" | |
type IPCRequestMessage = { | |
type: "request" | |
fn: string | |
id: string | |
args: any[] | |
callbacks: number[] | |
} | |
type IPCResponseMessage = { | |
type: "response" | |
fn: string | |
id: string | |
data?: any | |
error?: any | |
} | |
type IPCCallbackMessage = { | |
type: "callback" | |
fn: string | |
id: string | |
callback: number | |
args: any[] | |
} | |
type IPCUnsubscribeMessage = { | |
type: "unsubscribe" | |
fn: string | |
id: string | |
} | |
export type IPCMessage = | |
| IPCRequestMessage | |
| IPCResponseMessage | |
| IPCCallbackMessage | |
| IPCUnsubscribeMessage | |
// Note: if you call answer twice for the same function, they will both get called and race. | |
// You can catch this in the listen argument to the contructor, but it's your responsibility. | |
export class IPCPeer< | |
CallAPI extends AnyFunctionMap = AnyFunctionMap, | |
AnswerAPI extends AnyFunctionMap = AnyFunctionMap | |
> { | |
constructor( | |
private config: { | |
send(message: IPCMessage): Promise<void> | void | |
listen(callback: (message: IPCMessage) => void): () => void | |
} | |
) {} | |
private listeners = new Set<() => void>() | |
call = createProxy<Caller<CallAPI>>((fn: any, ...args: any) => | |
this.callFn(fn, ...args) | |
) | |
answer = createProxy<Answerer<AnswerAPI>>((fn: any, callback: any) => | |
this.answerFn(fn, callback) | |
) | |
callFn(fn: string, ...args: any[]) { | |
const callbacks: number[] = [] | |
const argsJson = args.map((arg, i) => { | |
if (isFunction(arg)) { | |
callbacks.push(i) | |
return null | |
} | |
return arg | |
}) | |
const request: IPCRequestMessage = { | |
type: "request", | |
id: `${fn}-${Math.random()}`, | |
fn, | |
args: argsJson, | |
callbacks, | |
} | |
let stopListeningCallback: (() => void) | undefined | |
if (callbacks.length) { | |
stopListeningCallback = this.config.listen((message) => { | |
if (message.id !== request.id) return | |
if (message.type !== "callback") return | |
args[message.callback](...message.args) | |
}) | |
this.listeners.add(stopListeningCallback) | |
} | |
const promise = new Promise<any>((resolve, reject) => { | |
const stopListening = this.config.listen((message) => { | |
if (message.id !== request.id) return | |
if (message.type !== "response") return | |
this.listeners.delete(stopListening) | |
stopListening() | |
if (message.error) { | |
reject(deserializeError(message.error)) | |
} else { | |
if (callbacks.length) { | |
resolve(() => { | |
if (stopListeningCallback) { | |
stopListeningCallback() | |
this.listeners.delete(stopListeningCallback) | |
} | |
this.config.send({ | |
type: "unsubscribe", | |
fn: request.fn, | |
id: request.id, | |
}) | |
}) | |
} else { | |
resolve(message.data) | |
} | |
} | |
}) | |
this.listeners.add(stopListening) | |
this.config.send(request) | |
}) | |
return promise | |
} | |
answerFn(fn: string, callback: (...args: any[]) => Promise<any>) { | |
const subscriptions = new Set<() => void>() | |
const stopListening = this.config.listen(async (message) => { | |
if (message.fn !== fn) return | |
if (message.type !== "request") return | |
try { | |
const args = [...message.args] | |
for (const i of message.callbacks) { | |
args[i] = async (...cbArgs: any[]) => { | |
await this.config.send({ | |
type: "callback", | |
fn: message.fn, | |
id: message.id, | |
callback: i, | |
args: cbArgs, | |
}) | |
} | |
} | |
const data = await callback(...args) | |
if (isFunction(data)) { | |
const unsubscribe = this.config.listen(async (msg) => { | |
if (message.id !== msg.id) return | |
if (msg.type !== "unsubscribe") return | |
unsubscribe() | |
subscriptions.delete(unsubscribe) | |
data() | |
}) | |
subscriptions.add(unsubscribe) | |
const response: IPCResponseMessage = { | |
type: "response", | |
id: message.id, | |
fn, | |
} | |
await this.config.send(response) | |
} else { | |
const response: IPCResponseMessage = { | |
type: "response", | |
id: message.id, | |
fn, | |
data, | |
} | |
await this.config.send(response) | |
} | |
} catch (error) { | |
const response: IPCResponseMessage = { | |
type: "response", | |
id: message.id, | |
fn, | |
error: serializeError(error), | |
} | |
await this.config.send(response) | |
} | |
}) | |
this.listeners.add(stopListening) | |
const cleanup = () => subscriptions.forEach((unsub) => unsub()) | |
this.listeners.add(cleanup) | |
return () => { | |
this.listeners.delete(stopListening) | |
stopListening() | |
this.listeners.delete(cleanup) | |
this.listeners.add(cleanup) | |
} | |
} | |
destroy() { | |
this.listeners.forEach((fn) => fn()) | |
this.listeners = new Set() | |
} | |
} | |
// Helpers. | |
type AnyFunction = (...args: any[]) => any | |
type Asyncify<F extends AnyFunction> = ReturnType<F> extends Promise<any> | |
? F | |
: (...args: Parameters<F>) => Promise<ReturnType<F>> | |
type AnyFunctionMap = { [key: string]: AnyFunction } | |
type Caller<T extends AnyFunctionMap> = { | |
[K in keyof T]: (...args: Parameters<T[K]>) => ReturnType<Asyncify<T[K]>> | |
} | |
type Answerer<T extends AnyFunctionMap> = { | |
[K in keyof T]: (fn: T[K] | Asyncify<T[K]>) => () => void | |
} | |
/** Warning: this is not soundly typed. */ | |
function createProxy<K extends { [key: string]: AnyFunction }>( | |
fn: (key: keyof K, ...args: any[]) => any | |
) { | |
return new Proxy( | |
{}, | |
{ | |
get(target, prop: any) { | |
return (...args: any[]) => { | |
return (fn as any)(prop, ...args) | |
} | |
}, | |
} | |
) as K | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment