Skip to content

Instantly share code, notes, and snippets.

@artalar
Last active August 18, 2023 14:55
Show Gist options
  • Save artalar/085821ff3859828784efbdc6a810394a to your computer and use it in GitHub Desktop.
Save artalar/085821ff3859828784efbdc6a810394a to your computer and use it in GitHub Desktop.
export type EventOfTarget<
Target extends EventTarget,
Type extends string,
> = Target extends Record<`on${Type}`, infer Cb>
? // @ts-expect-error `Cb extends Fn` broke the inference for some reason
Parameters<Cb>[0] // correct type
: Target extends Record<'onEvent', (type: Type, cb: infer Cb) => any>
? // @ts-expect-error `Cb extends Fn` broke the inference for some reason
Parameters<Cb>[0] // general type
: never
// @ts-ignore
export const onEvent: {
<
Target extends EventTarget,
Type extends Target extends Record<`on${infer Type}`, Fn> ? Type : string,
>(
ctx: Ctx,
target: Target,
type: Type,
): Promise<EventOfTarget<Target, Type>>
<Event>(ctx: Ctx, target: EventTarget, type: string): Promise<Event>
<
Target extends EventTarget,
Type extends Target extends Record<`on${infer Type}`, Fn> ? Type : string,
>(
ctx: Ctx,
target: Target,
type: Type,
cb: (value: EventOfTarget<Target, Type>) => any,
): Unsubscribe
<Event>(
ctx: Ctx,
target: EventTarget,
type: string,
cb: (value: Event) => any,
): Unsubscribe
} = (ctx: Ctx, target: EventTarget, type: string, listener: Fn) => {
let un
if (!listener) {
return new Promise((r) => (un = onEvent(ctx, target, type, r))).finally(un)
}
target.addEventListener(type, listener)
un = () => target.removeEventListener(type, listener)
onCtxAbort(ctx, un)
return un
}
const socket = new WebSocket("wss://example.com");
function multiplex({ startMsg, stopMsg, match }) {
if (socket.readyState !== WebSocket.OPEN) {
return socket
.on("open")
.flatMap(() => multiplex({ startMsg, stopMsg, match }));
} else {
socket.send(JSON.stringify(startMsg));
return socket
.on("message")
.filter(match)
.takeUntil(socket.on("close"))
.takeUntil(socket.on("error"))
.map((e) => JSON.parse(e.data))
.finally(() => {
socket.send(JSON.stringify(stopMsg));
});
}
}
function streamStock(ticker) {
return multiplex({
startMsg: { ticker, type: "sub" },
stopMsg: { ticker, type: "unsub" },
match: (data) => data.ticker === ticker,
});
}
const googTrades = streamStock("GOOG");
const googController = new AbortController();
const googSubscription = googTrades.subscribe({
next: updateView,
signal: googController.signal,
});
const socket = new WebSocket('wss://example.com')
function multiplex({ startMsg, stopMsg, match }) {
const message = atom(null)
onConnect(message, async (ctx) => {
if (socket.readyState !== WebSocket.OPEN) {
await onEvent(ctx, socket, 'open')
}
socket.send(JSON.stringify(startMsg));
onEvent(ctx, socket, 'message', (event) => {
if (match(event)) message(ctx, JSON.parse(event.data))
})
onEvent(ctx, socket, 'close', () => ctx.controller.abort())
onEvent(ctx, socket, 'error', () => ctx.controller.abort())
onCtxAbort(ctx, () => socket.send(JSON.stringify(stopMsg)))
})
return message
}
function streamStock(ticker) {
return multiplex({
startMsg: { ticker, type: 'sub' },
stopMsg: { ticker, type: 'unsub' },
match: (data) => data.ticker === ticker,
})
}
const googTrades = streamStock('GOOG')
ctx.subscribe(googTrades, updateView)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment