Skip to content

Instantly share code, notes, and snippets.

@ccorcos
Created January 2, 2022 04:19
Show Gist options
  • Save ccorcos/74a21cc04e36627f719d232b0003a6f2 to your computer and use it in GitHub Desktop.
Save ccorcos/74a21cc04e36627f719d232b0003a6f2 to your computer and use it in GitHub Desktop.
import { strict as assert } from "assert"
import EventEmitter from "events"
import { describe, it } from "mocha"
import { IPCPeer } from "./IPCPeer"
function setupPeers<
A2B extends AnyFunctionMap = AnyFunctionMap,
B2A extends AnyFunctionMap = AnyFunctionMap
>() {
const aEvents = new EventEmitter()
const bEvents = new EventEmitter()
const a = new IPCPeer<A2B, B2A>({
send: (message) => {
bEvents.emit("event", message)
},
listen: (callback) => {
aEvents.on("event", callback)
return () => aEvents.off("event", callback)
},
})
const b = new IPCPeer<B2A, A2B>({
send: (message) => {
aEvents.emit("event", message)
},
listen: (callback) => {
bEvents.on("event", callback)
return () => bEvents.off("event", callback)
},
})
return { a, b }
}
describe("IPCPeer", () => {
it("works", async () => {
const { a, b } = setupPeers()
const stopB = b.answerFn("add", (x, y) => x + y)
const stopA = a.answerFn("double", (x) => x + x)
assert.equal(await a.callFn("add", 10, 2), 12)
assert.equal(await b.callFn("double", 10), 20)
})
it.skip("Stop listening works", async () => {})
it.skip("Destroy works", async () => {})
it("Works with proxy types", async () => {
type A = {
add(x: number, y: number): number
}
type B = {
double(x: number): number
}
const { a, b } = setupPeers<A, B>()
const stopB = b.answer.add((x, y) => x + y)
const stopA = a.answer.double((x) => x + x)
assert.equal(await a.call.add(10, 2), 12)
assert.equal(await b.call.double(10), 20)
})
it("Subscribes", async () => {
type A = {
count(n: number, cb: (count: number) => void): () => void
}
type B = {}
const { a, b } = setupPeers<A, B>()
const stopB = b.answer.count((initialCount, cb) => {
let n = initialCount
const timerId = setInterval(() => {
n += 1
cb(n)
}, 1)
return () => {
clearInterval(timerId)
}
})
const results: number[] = []
const unsub = await a.call.count(12, (n) => {
results.push(n)
})
assert.equal(typeof unsub, "function")
await sleep(10)
unsub()
const len = results.length
assert.deepEqual(results.slice(0, 5), [13, 14, 15, 16, 17])
await sleep(10)
assert.equal(len, results.length)
})
})
import { isFunction } from "lodash"
import { deserializeError, serializeError } from "serialize-error"
type IPCRequestMessage = {
type: "request"
fn: string
id: string
args: any[]
callbacks: number[]
}
type IPCResponseMessage = {
type: "response"
fn: string
id: string
data?: any
error?: any
}
type IPCCallbackMessage = {
type: "callback"
fn: string
id: string
callback: number
args: any[]
}
type IPCUnsubscribeMessage = {
type: "unsubscribe"
fn: string
id: string
}
export type IPCMessage =
| IPCRequestMessage
| IPCResponseMessage
| IPCCallbackMessage
| IPCUnsubscribeMessage
// Note: if you call answer twice for the same function, they will both get called and race.
// You can catch this in the listen argument to the contructor, but it's your responsibility.
export class IPCPeer<
CallAPI extends AnyFunctionMap = AnyFunctionMap,
AnswerAPI extends AnyFunctionMap = AnyFunctionMap
> {
constructor(
private config: {
send(message: IPCMessage): Promise<void> | void
listen(callback: (message: IPCMessage) => void): () => void
}
) {}
private listeners = new Set<() => void>()
call = createProxy<Caller<CallAPI>>((fn: any, ...args: any) =>
this.callFn(fn, ...args)
)
answer = createProxy<Answerer<AnswerAPI>>((fn: any, callback: any) =>
this.answerFn(fn, callback)
)
callFn(fn: string, ...args: any[]) {
const callbacks: number[] = []
const argsJson = args.map((arg, i) => {
if (isFunction(arg)) {
callbacks.push(i)
return null
}
return arg
})
const request: IPCRequestMessage = {
type: "request",
id: `${fn}-${Math.random()}`,
fn,
args: argsJson,
callbacks,
}
let stopListeningCallback: (() => void) | undefined
if (callbacks.length) {
stopListeningCallback = this.config.listen((message) => {
if (message.id !== request.id) return
if (message.type !== "callback") return
args[message.callback](...message.args)
})
this.listeners.add(stopListeningCallback)
}
const promise = new Promise<any>((resolve, reject) => {
const stopListening = this.config.listen((message) => {
if (message.id !== request.id) return
if (message.type !== "response") return
this.listeners.delete(stopListening)
stopListening()
if (message.error) {
reject(deserializeError(message.error))
} else {
if (callbacks.length) {
resolve(() => {
if (stopListeningCallback) {
stopListeningCallback()
this.listeners.delete(stopListeningCallback)
}
this.config.send({
type: "unsubscribe",
fn: request.fn,
id: request.id,
})
})
} else {
resolve(message.data)
}
}
})
this.listeners.add(stopListening)
this.config.send(request)
})
return promise
}
answerFn(fn: string, callback: (...args: any[]) => Promise<any>) {
const subscriptions = new Set<() => void>()
const stopListening = this.config.listen(async (message) => {
if (message.fn !== fn) return
if (message.type !== "request") return
try {
const args = [...message.args]
for (const i of message.callbacks) {
args[i] = async (...cbArgs: any[]) => {
await this.config.send({
type: "callback",
fn: message.fn,
id: message.id,
callback: i,
args: cbArgs,
})
}
}
const data = await callback(...args)
if (isFunction(data)) {
const unsubscribe = this.config.listen(async (msg) => {
if (message.id !== msg.id) return
if (msg.type !== "unsubscribe") return
unsubscribe()
subscriptions.delete(unsubscribe)
data()
})
subscriptions.add(unsubscribe)
const response: IPCResponseMessage = {
type: "response",
id: message.id,
fn,
}
await this.config.send(response)
} else {
const response: IPCResponseMessage = {
type: "response",
id: message.id,
fn,
data,
}
await this.config.send(response)
}
} catch (error) {
const response: IPCResponseMessage = {
type: "response",
id: message.id,
fn,
error: serializeError(error),
}
await this.config.send(response)
}
})
this.listeners.add(stopListening)
const cleanup = () => subscriptions.forEach((unsub) => unsub())
this.listeners.add(cleanup)
return () => {
this.listeners.delete(stopListening)
stopListening()
this.listeners.delete(cleanup)
this.listeners.add(cleanup)
}
}
destroy() {
this.listeners.forEach((fn) => fn())
this.listeners = new Set()
}
}
// Helpers.
type AnyFunction = (...args: any[]) => any
type Asyncify<F extends AnyFunction> = ReturnType<F> extends Promise<any>
? F
: (...args: Parameters<F>) => Promise<ReturnType<F>>
type AnyFunctionMap = { [key: string]: AnyFunction }
type Caller<T extends AnyFunctionMap> = {
[K in keyof T]: (...args: Parameters<T[K]>) => ReturnType<Asyncify<T[K]>>
}
type Answerer<T extends AnyFunctionMap> = {
[K in keyof T]: (fn: T[K] | Asyncify<T[K]>) => () => void
}
/** Warning: this is not soundly typed. */
function createProxy<K extends { [key: string]: AnyFunction }>(
fn: (key: keyof K, ...args: any[]) => any
) {
return new Proxy(
{},
{
get(target, prop: any) {
return (...args: any[]) => {
return (fn as any)(prop, ...args)
}
},
}
) as K
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment