Last active
March 26, 2026 00:56
-
-
Save rebolyte/e4c5f9e6a34fa82d1ac3ae6289c5ca5d to your computer and use it in GitHub Desktop.
Actor
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| export const Done = Symbol("Done"); | |
| export type CrashDirective = "resume" | "reset" | "stop" | "escalate"; | |
| export type ActorRef<M> = { | |
| send(message: M): void; | |
| stop(): void; | |
| readonly name: string; | |
| readonly signal: AbortSignal; | |
| }; | |
| export type Ctx<M> = { | |
| self: ActorRef<M>; | |
| signal: AbortSignal; | |
| spawn: <S2, M2>(config: SpawnConfig<S2, M2>) => ActorRef<M2>; | |
| lookup: <M2>(name: string) => ActorRef<M2>; | |
| }; | |
| export type SpawnConfig<S, M> = { | |
| name: string; | |
| initialState: S; | |
| onMessage: (state: S, message: M, ctx: Ctx<M>) => Promise<S | typeof Done>; | |
| onCrash?: (error: unknown, message: M, ctx: Ctx<M>) => CrashDirective; | |
| onDone?: (ctx: Ctx<M>) => void; | |
| }; | |
| export type ActorSystem = { | |
| spawn: <S, M>(config: SpawnConfig<S, M>) => ActorRef<M>; | |
| lookup: <M>(name: string) => ActorRef<M>; | |
| stop(): void; | |
| readonly signal: AbortSignal; | |
| }; | |
| export function createSystem(): ActorSystem { | |
| const registry = new Map<string, ActorRef<any>>(); | |
| const topLevel = new Set<ActorRef<any>>(); | |
| const ac = new AbortController(); | |
| function lookup<M>(name: string): ActorRef<M> { | |
| const ref = registry.get(name); | |
| if (!ref) throw new Error(`Actor not found: ${name}`); | |
| return ref; | |
| } | |
| function spawnActor<S, M>( | |
| config: SpawnConfig<S, M>, | |
| parent: Set<ActorRef<any>>, | |
| parentSignal: AbortSignal, | |
| onEscalate?: (error: unknown) => void, | |
| ): ActorRef<M> { | |
| if (registry.has(config.name)) { | |
| throw new Error(`Actor already exists: ${config.name}`); | |
| } | |
| let state = config.initialState; | |
| let processing = false; | |
| const queue: M[] = []; | |
| const children = new Set<ActorRef<any>>(); | |
| const childAc = new AbortController(); | |
| // Parent abort cascades to this actor | |
| parentSignal.addEventListener( | |
| "abort", | |
| () => childAc.abort(), | |
| { once: true }, | |
| ); | |
| function handleChildEscalation(error: unknown) { | |
| const directive = | |
| config.onCrash?.(error, undefined as any, ctx) ?? "stop"; | |
| applyDirective(directive, error); | |
| } | |
| function applyDirective(directive: CrashDirective, error: unknown) { | |
| switch (directive) { | |
| case "resume": | |
| break; | |
| case "reset": | |
| state = config.initialState; | |
| break; | |
| case "stop": | |
| self.stop(); | |
| break; | |
| case "escalate": | |
| if (onEscalate) { | |
| onEscalate(error); | |
| } else { | |
| console.error( | |
| `Unhandled escalation in actor "${config.name}":`, | |
| error, | |
| ); | |
| self.stop(); | |
| } | |
| break; | |
| } | |
| } | |
| const self: ActorRef<M> = { | |
| name: config.name, | |
| signal: childAc.signal, | |
| send(message: M) { | |
| if (childAc.signal.aborted) return; // dead letter | |
| queue.push(message); | |
| drain(); | |
| }, | |
| stop() { | |
| if (childAc.signal.aborted) return; | |
| childAc.abort(); // cascades to children via their listener | |
| queue.length = 0; | |
| registry.delete(config.name); | |
| parent.delete(self); | |
| }, | |
| }; | |
| const ctx: Ctx<M> = { | |
| self, | |
| signal: childAc.signal, | |
| spawn: <S2, M2>(childConfig: SpawnConfig<S2, M2>) => | |
| spawnActor(childConfig, children, childAc.signal, handleChildEscalation), | |
| lookup, | |
| }; | |
| async function drain() { | |
| if (processing || childAc.signal.aborted) return; | |
| processing = true; | |
| while (queue.length > 0 && !childAc.signal.aborted) { | |
| const msg = queue.shift()!; | |
| try { | |
| const result = await config.onMessage(state, msg, ctx); | |
| if (result === Done) { | |
| config.onDone?.(ctx); | |
| self.stop(); | |
| return; | |
| } | |
| state = result as S; | |
| } catch (error) { | |
| const directive = config.onCrash?.(error, msg, ctx) ?? "stop"; | |
| applyDirective(directive, error); | |
| if (childAc.signal.aborted) return; | |
| } | |
| } | |
| processing = false; | |
| } | |
| registry.set(config.name, self); | |
| parent.add(self); | |
| return self; | |
| } | |
| return { | |
| signal: ac.signal, | |
| spawn: <S, M>(config: SpawnConfig<S, M>) => { | |
| if (ac.signal.aborted) throw new Error("System is stopped"); | |
| return spawnActor(config, topLevel, ac.signal); | |
| }, | |
| lookup, | |
| stop() { | |
| ac.abort(); // cascades through entire tree | |
| }, | |
| }; | |
| } | |
| // -- ask -- | |
| export type AskOptions = { | |
| timeoutMs?: number; | |
| signal?: AbortSignal; | |
| }; | |
| export function ask<M, R>( | |
| system: ActorSystem, | |
| target: ActorRef<M>, | |
| factory: (replyTo: ActorRef<R>) => M, | |
| opts: AskOptions = {}, | |
| ): Promise<R> { | |
| return new Promise((resolve, reject) => { | |
| const timeout = AbortSignal.timeout(opts.timeoutMs ?? 5000); | |
| const signals = [timeout, system.signal]; | |
| if (opts.signal) signals.push(opts.signal); | |
| const combined = AbortSignal.any(signals); | |
| const ref = system.spawn<undefined, R>({ | |
| name: `_ask_${crypto.randomUUID()}`, | |
| initialState: undefined, | |
| onMessage: async (_state, reply) => { | |
| resolve(reply); | |
| return Done; | |
| }, | |
| }); | |
| combined.addEventListener( | |
| "abort", | |
| () => { | |
| ref.stop(); | |
| const reason = combined.reason; | |
| if (reason instanceof DOMException && reason.name === "TimeoutError") { | |
| reject(new Error("Ask timed out")); | |
| } else { | |
| reject(new Error("Ask aborted")); | |
| } | |
| }, | |
| { once: true }, | |
| ); | |
| target.send(factory(ref)); | |
| }); | |
| } | |
| // -- fromCallback -- | |
| type CallbackAPI<TSend, TReceive = TSend> = { | |
| send: (msg: TSend) => void; | |
| receive: (handler: (msg: TReceive) => void) => void; | |
| }; | |
| type CallbackSetup<TSend, TReceive = TSend> = ( | |
| api: CallbackAPI<TSend, TReceive>, | |
| signal: AbortSignal, | |
| ) => void; | |
| export function fromCallback<TSend, TReceive = TSend>( | |
| system: ActorSystem, | |
| name: string, | |
| setup: CallbackSetup<TSend, TReceive>, | |
| ): ActorRef<TReceive> { | |
| let listener: ((msg: TReceive) => void) | null = null; | |
| const ref = system.spawn<null, TReceive | typeof Done>({ | |
| name, | |
| initialState: null, | |
| onMessage: async (_state, msg) => { | |
| if (msg === Done) return Done; | |
| listener?.(msg as TReceive); | |
| return null; | |
| }, | |
| }); | |
| setup( | |
| { | |
| send: (msg: TSend) => ref.send(msg as any), | |
| receive: (handler) => { | |
| listener = handler; | |
| }, | |
| }, | |
| ref.signal, | |
| ); | |
| return ref as ActorRef<TReceive>; | |
| } | |
| // -- fromPromise -- | |
| type PromiseResult<T> = | |
| | { type: "resolved"; value: T } | |
| | { type: "rejected"; error: unknown }; | |
| export function fromPromise<T>( | |
| system: ActorSystem, | |
| name: string, | |
| work: (signal: AbortSignal) => Promise<T>, | |
| target: ActorRef<PromiseResult<T>>, | |
| ): ActorRef<never> { | |
| const ref = system.spawn<null, never>({ | |
| name, | |
| initialState: null, | |
| onMessage: async () => null, | |
| }); | |
| work(ref.signal).then( | |
| (value) => { | |
| if (!ref.signal.aborted) { | |
| target.send({ type: "resolved", value }); | |
| } | |
| ref.stop(); | |
| }, | |
| (error) => { | |
| if (!ref.signal.aborted) { | |
| target.send({ type: "rejected", error }); | |
| } | |
| ref.stop(); | |
| }, | |
| ); | |
| return ref; | |
| } | |
| // -- fromObservable -- | |
| type Subscribable<T> = { | |
| subscribe( | |
| next: (value: T) => void, | |
| error?: (err: unknown) => void, | |
| complete?: () => void, | |
| ): { unsubscribe(): void }; | |
| }; | |
| export function fromObservable<T>( | |
| system: ActorSystem, | |
| name: string, | |
| observable: Subscribable<T>, | |
| target: ActorRef<T>, | |
| ): ActorRef<typeof Done> { | |
| const ref = system.spawn<null, typeof Done>({ | |
| name, | |
| initialState: null, | |
| onMessage: async (_state, msg) => { | |
| if (msg === Done) return Done; | |
| return null; | |
| }, | |
| }); | |
| const sub = observable.subscribe( | |
| (value) => target.send(value), | |
| (err) => { | |
| console.error(`Observable error in "${name}":`, err); | |
| ref.stop(); | |
| }, | |
| () => ref.stop(), | |
| ); | |
| // Actor death → unsubscribe. One line replaces onDone + mutable tracking. | |
| ref.signal.addEventListener("abort", () => sub.unsubscribe(), { once: true }); | |
| return ref as ActorRef<typeof Done>; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| export const Done = Symbol("Done"); | |
| export type CrashDirective = "resume" | "reset" | "stop" | "escalate"; | |
| export type ActorRef<M> = { | |
| send(message: M): void; | |
| stop(): void; | |
| readonly name: string; | |
| }; | |
| export type Ctx<M> = { | |
| self: ActorRef<M>; | |
| spawn: <S2, M2>(config: SpawnConfig<S2, M2>) => ActorRef<M2>; | |
| lookup: <M2>(name: string) => ActorRef<M2>; | |
| }; | |
| export type SpawnConfig<S, M> = { | |
| name: string; | |
| initialState: S; | |
| onMessage: (state: S, message: M, ctx: Ctx<M>) => Promise<S | typeof Done>; | |
| onCrash?: (error: unknown, message: M, ctx: Ctx<M>) => CrashDirective; | |
| onDone?: (ctx: Ctx<M>) => void; | |
| }; | |
| export type ActorSystem = { | |
| spawn: <S, M>(config: SpawnConfig<S, M>) => ActorRef<M>; | |
| lookup: <M>(name: string) => ActorRef<M>; | |
| stop(): void; | |
| }; | |
| export function createSystem(): ActorSystem { | |
| const registry = new Map<string, ActorRef<any>>(); | |
| const topLevel = new Set<ActorRef<any>>(); | |
| let stopped = false; | |
| function lookup<M>(name: string): ActorRef<M> { | |
| const ref = registry.get(name); | |
| if (!ref) throw new Error(`Actor not found: ${name}`); | |
| return ref; | |
| } | |
| function spawnActor<S, M>( | |
| config: SpawnConfig<S, M>, | |
| parent: Set<ActorRef<any>>, | |
| onEscalate?: (error: unknown) => void, | |
| ): ActorRef<M> { | |
| if (registry.has(config.name)) { | |
| throw new Error(`Actor already exists: ${config.name}`); | |
| } | |
| let state = config.initialState; | |
| let processing = false; | |
| let alive = true; | |
| const queue: M[] = []; | |
| const children = new Set<ActorRef<any>>(); | |
| function handleChildEscalation(error: unknown) { | |
| // child escalated to us — run our own onCrash as if we crashed | |
| const directive = config.onCrash?.(error, undefined as any, ctx) ?? "stop"; | |
| applyDirective(directive, error); | |
| } | |
| function applyDirective(directive: CrashDirective, error: unknown) { | |
| switch (directive) { | |
| case "resume": | |
| // keep state, keep going — drop the bad message | |
| break; | |
| case "reset": | |
| // restart with initial state, keep children, keep mailbox | |
| state = config.initialState; | |
| break; | |
| case "stop": | |
| self.stop(); | |
| break; | |
| case "escalate": | |
| if (onEscalate) { | |
| onEscalate(error); | |
| } else { | |
| // top-level with no handler — stop | |
| console.error(`Unhandled escalation in actor "${config.name}":`, error); | |
| self.stop(); | |
| } | |
| break; | |
| } | |
| } | |
| const self: ActorRef<M> = { | |
| name: config.name, | |
| send(message: M) { | |
| if (!alive) return; // dead letter | |
| queue.push(message); | |
| drain(); | |
| }, | |
| stop() { | |
| if (!alive) return; | |
| alive = false; | |
| for (const child of children) child.stop(); | |
| children.clear(); | |
| queue.length = 0; | |
| registry.delete(config.name); | |
| parent.delete(self); | |
| }, | |
| }; | |
| const ctx: Ctx<M> = { | |
| self, | |
| spawn: <S2, M2>(childConfig: SpawnConfig<S2, M2>) => | |
| spawnActor(childConfig, children, handleChildEscalation), | |
| lookup, | |
| }; | |
| async function drain() { | |
| if (processing || !alive) return; | |
| processing = true; | |
| while (queue.length > 0 && alive) { | |
| const msg = queue.shift()!; | |
| try { | |
| const result = await config.onMessage(state, msg, ctx); | |
| if (result === Done) { | |
| config.onDone?.(ctx); | |
| self.stop(); | |
| return; | |
| } | |
| state = result as S; | |
| } catch (error) { | |
| const directive = config.onCrash?.(error, msg, ctx) ?? "stop"; | |
| applyDirective(directive, error); | |
| if (!alive) return; | |
| } | |
| } | |
| processing = false; | |
| } | |
| registry.set(config.name, self); | |
| parent.add(self); | |
| return self; | |
| } | |
| return { | |
| spawn: <S, M>(config: SpawnConfig<S, M>) => { | |
| if (stopped) throw new Error("System is stopped"); | |
| return spawnActor(config, topLevel); | |
| }, | |
| lookup, | |
| stop() { | |
| stopped = true; | |
| for (const ref of topLevel) ref.stop(); | |
| topLevel.clear(); | |
| }, | |
| }; | |
| } | |
| // ask: send a message expecting a reply, using a temp actor | |
| export function ask<M, R>( | |
| system: ActorSystem, | |
| target: ActorRef<M>, | |
| factory: (replyTo: ActorRef<R>) => M, | |
| timeoutMs = 5000, | |
| ): Promise<R> { | |
| return new Promise((resolve, reject) => { | |
| const timer = setTimeout(() => { | |
| ref.stop(); | |
| reject(new Error("Ask timed out")); | |
| }, timeoutMs); | |
| const ref = system.spawn<undefined, R>({ | |
| name: `_ask_${crypto.randomUUID()}`, | |
| initialState: undefined, | |
| onMessage: async (_state, reply) => { | |
| clearTimeout(timer); | |
| resolve(reply); | |
| return Done; | |
| }, | |
| }); | |
| target.send(factory(ref)); | |
| }); | |
| } | |
| // -- fromCallback -- | |
| // You get a `send` function to push messages in from callback-land. | |
| // Return a cleanup function (removeListener, clearInterval, etc). | |
| type CallbackAPI<TSend, TReceive = TSend> = { | |
| send: (msg: TSend) => void; | |
| receive: (handler: (msg: TReceive) => void) => void; | |
| }; | |
| type CallbackSetup<TSend, TReceive = TSend> = ( | |
| api: CallbackAPI<TSend, TReceive>, | |
| ) => (() => void) | void; | |
| export function fromCallback<TSend, TReceive = TSend>( | |
| system: ActorSystem, | |
| name: string, | |
| setup: CallbackSetup<TSend, TReceive>, | |
| ): ActorRef<TReceive> { | |
| let cleanup: (() => void) | void; | |
| let listener: ((msg: TReceive) => void) | null = null; | |
| const ref = system.spawn<null, TReceive | typeof Done>({ | |
| name, | |
| initialState: null, | |
| onMessage: async (_state, msg) => { | |
| if (msg === Done) { | |
| cleanup?.(); | |
| return Done; | |
| } | |
| listener?.(msg as TReceive); | |
| return null; | |
| }, | |
| }); | |
| cleanup = setup({ | |
| send: (msg: TSend) => ref.send(msg as any), | |
| receive: (handler) => { listener = handler; }, | |
| }); | |
| return ref as ActorRef<TReceive>; | |
| } | |
| // -- fromPromise -- | |
| // Spawns an actor that resolves a promise, then sends result to `target`. | |
| // Actor self-terminates after. Rejects go to onError or kill the actor. | |
| type PromiseResult<T> = { type: "resolved"; value: T } | { type: "rejected"; error: unknown }; | |
| export function fromPromise<T>( | |
| system: ActorSystem, | |
| name: string, | |
| work: () => Promise<T>, | |
| target: ActorRef<PromiseResult<T>>, | |
| ): ActorRef<never> { | |
| const ref = system.spawn<null, never>({ | |
| name, | |
| initialState: null, | |
| onMessage: async () => null, // no inbound messages expected | |
| }); | |
| work().then( | |
| (value) => { | |
| target.send({ type: "resolved", value }); | |
| ref.stop(); | |
| }, | |
| (error) => { | |
| target.send({ type: "rejected", error }); | |
| ref.stop(); | |
| }, | |
| ); | |
| return ref; | |
| } | |
| // -- fromObservable -- | |
| // Subscribes to anything with a `subscribe(next, error?, complete?)` shape. | |
| // Forwards emissions as messages to `target`. Cleans up on stop. | |
| type Subscribable<T> = { | |
| subscribe( | |
| next: (value: T) => void, | |
| error?: (err: unknown) => void, | |
| complete?: () => void, | |
| ): { unsubscribe(): void }; | |
| }; | |
| export function fromObservable<T>( | |
| system: ActorSystem, | |
| name: string, | |
| observable: Subscribable<T>, | |
| target: ActorRef<T>, | |
| ): ActorRef<typeof Done> { | |
| let sub: { unsubscribe(): void } | null = null; | |
| const ref = system.spawn<null, typeof Done>({ | |
| name, | |
| initialState: null, | |
| onMessage: async (_state, msg) => { | |
| if (msg === Done) { | |
| sub?.unsubscribe(); | |
| return Done; | |
| } | |
| return null; | |
| }, | |
| onDone: () => sub?.unsubscribe(), | |
| }); | |
| sub = observable.subscribe( | |
| (value) => target.send(value), | |
| (err) => { | |
| console.error(`Observable error in "${name}":`, err); | |
| ref.stop(); | |
| }, | |
| () => ref.stop(), | |
| ); | |
| return ref as ActorRef<typeof Done>; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { | |
| createSystem, | |
| ask, | |
| Done, | |
| fromCallback, | |
| fromPromise, | |
| fromObservable, | |
| type ActorRef, | |
| } from "./actors"; | |
| const system = createSystem(); | |
| // -- Example: ping-pong with a counter that stops after 3 -- | |
| type PingMsg = { type: "ping"; replyTo: ActorRef<PongMsg> }; | |
| type PongMsg = { type: "pong"; count: number }; | |
| const ponger = system.spawn<null, PingMsg>({ | |
| name: "ponger", | |
| initialState: null, | |
| onMessage: async (_state, msg) => { | |
| msg.replyTo.send({ type: "pong", count: 1 }); | |
| return null; | |
| }, | |
| }); | |
| const pinger = system.spawn<number, PongMsg>({ | |
| name: "pinger", | |
| initialState: 0, | |
| onMessage: async (count, _msg, ctx) => { | |
| const next = count + 1; | |
| console.log(`pinger: received pong #${next}`); | |
| if (next >= 3) return Done; | |
| ponger.send({ type: "ping", replyTo: ctx.self }); | |
| return next; | |
| }, | |
| onDone: () => console.log("pinger: done after 3 pongs"), | |
| }); | |
| // kick it off | |
| ponger.send({ type: "ping", replyTo: pinger }); | |
| // -- Example: ask pattern (now with signal composition) -- | |
| type MathMsg = { op: "add"; a: number; b: number; replyTo: ActorRef<number> }; | |
| const calculator = system.spawn<null, MathMsg>({ | |
| name: "calculator", | |
| initialState: null, | |
| onMessage: async (_state, msg) => { | |
| msg.replyTo.send(msg.a + msg.b); | |
| return null; | |
| }, | |
| }); | |
| // Basic ask — timeout via options object | |
| const result = await ask<MathMsg, number>(system, calculator, (replyTo) => ({ | |
| op: "add", | |
| a: 17, | |
| b: 25, | |
| replyTo, | |
| }), { timeoutMs: 3000 }); | |
| console.log(`ask result: 17 + 25 = ${result}`); | |
| // Ask with external cancellation — e.g. user navigated away | |
| const userAc = new AbortController(); | |
| try { | |
| const result2 = await ask<MathMsg, number>(system, calculator, (replyTo) => ({ | |
| op: "add", | |
| a: 1, | |
| b: 2, | |
| replyTo, | |
| }), { timeoutMs: 5000, signal: userAc.signal }); | |
| console.log(`ask result: 1 + 2 = ${result2}`); | |
| } catch (e) { | |
| console.log("ask was cancelled or timed out:", (e as Error).message); | |
| } | |
| // -- Example: parent-child hierarchy (observe lifecycle via signal) -- | |
| const parent = system.spawn<null, string>({ | |
| name: "parent", | |
| initialState: null, | |
| onMessage: async (_state, msg, ctx) => { | |
| if (msg === "spawn-child") { | |
| const child = ctx.spawn({ | |
| name: "child", | |
| initialState: 0, | |
| onMessage: async (n, _msg) => { | |
| console.log(`child: message #${n + 1}`); | |
| return n + 1; | |
| }, | |
| }); | |
| // External code can observe child lifecycle via signal | |
| child.signal.addEventListener("abort", () => { | |
| console.log("child was stopped (observed via signal)"); | |
| }, { once: true }); | |
| } | |
| return null; | |
| }, | |
| }); | |
| parent.send("spawn-child"); | |
| // stopping parent cascades via AbortController — no manual children.clear() | |
| setTimeout(() => { | |
| parent.stop(); | |
| console.log("parent stopped (child stopped with it)"); | |
| }, 100); | |
| // -- Example: onMessage using ctx.signal for cancellable work -- | |
| const fetcher = system.spawn<null, { url: string }>({ | |
| name: "fetcher", | |
| initialState: null, | |
| onMessage: async (_state, msg, ctx) => { | |
| // If this actor is stopped mid-fetch, the request is aborted | |
| const res = await fetch(msg.url, { signal: ctx.signal }); | |
| console.log(`fetched ${msg.url}: ${res.status}`); | |
| return null; | |
| }, | |
| onCrash: (error) => { | |
| if (error instanceof DOMException && error.name === "AbortError") { | |
| return "stop"; // expected — actor was killed, no drama | |
| } | |
| return "resume"; // transient fetch error, try next message | |
| }, | |
| }); | |
| // -- fromCallback: wrap setInterval (signal replaces return cleanup) -- | |
| const ticker = fromCallback<number>(system, "ticker", ({ send }, signal) => { | |
| let i = 0; | |
| const id = setInterval(() => send(++i), 1000); | |
| // signal handles cleanup — no return function needed | |
| signal.addEventListener("abort", () => clearInterval(id), { once: true }); | |
| }); | |
| setTimeout(() => ticker.stop(), 3500); | |
| // -- fromCallback: bidirectional WebSocket (signal for all listeners) -- | |
| type WsOut = { type: "send"; data: string }; | |
| type WsIn = { type: "message"; data: string }; | |
| const wsActor = fromCallback<WsIn, WsOut>(system, "ws", ({ send, receive }, signal) => { | |
| const socket = new WebSocket("ws://localhost:3000"); | |
| // All listeners auto-removed when actor stops | |
| socket.addEventListener("open", () => { | |
| console.log("ws connected"); | |
| }, { signal }); | |
| socket.addEventListener("message", (e) => { | |
| send({ type: "message", data: e.data }); | |
| }, { signal }); | |
| socket.addEventListener("error", (e) => { | |
| console.error("ws error:", e); | |
| }, { signal }); | |
| // Outbound: actor system → socket | |
| receive((msg) => { | |
| if (msg.type === "send") socket.send(msg.data); | |
| }); | |
| // Close socket on actor teardown | |
| signal.addEventListener("abort", () => socket.close(), { once: true }); | |
| }); | |
| wsActor.send({ type: "send", data: "hello server" }); | |
| // -- fromPromise: one-shot async work (signal enables cancellation) -- | |
| const logger = system.spawn<null, any>({ | |
| name: "logger", | |
| initialState: null, | |
| onMessage: async (_s, msg) => { | |
| console.log("logger received:", msg); | |
| return null; | |
| }, | |
| }); | |
| fromPromise( | |
| system, | |
| "fetch-user", | |
| async (signal) => { | |
| // signal lets us abort if the actor is killed mid-flight | |
| const res = await fetch("https://api.example.com/user/1", { signal }); | |
| return res.json(); | |
| }, | |
| logger, | |
| ); | |
| // logger receives: { type: "resolved", value: { id: 1, name: "James" } } | |
| // -- fromObservable: any Subscribable shape -- | |
| const fakeObservable = { | |
| subscribe( | |
| next: (v: string) => void, | |
| _error?: (e: unknown) => void, | |
| complete?: () => void, | |
| ) { | |
| next("hello"); | |
| next("world"); | |
| setTimeout(() => { | |
| next("async one"); | |
| complete?.(); | |
| }, 100); | |
| return { unsubscribe() {} }; | |
| }, | |
| }; | |
| fromObservable(system, "words", fakeObservable, logger); | |
| // logger receives: "hello", "world", "async one" | |
| // -- Graceful shutdown: one abort tears down everything -- | |
| process.on("SIGTERM", () => { | |
| console.log("SIGTERM received, shutting down..."); | |
| system.stop(); // one call → entire actor tree aborts | |
| }); | |
| // You can also observe system-level shutdown from anywhere | |
| system.signal.addEventListener("abort", () => { | |
| console.log("system shutdown complete"); | |
| }, { once: true }); | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment