Skip to content

Instantly share code, notes, and snippets.

@rebolyte
Last active March 26, 2026 00:56
Show Gist options
  • Select an option

  • Save rebolyte/e4c5f9e6a34fa82d1ac3ae6289c5ca5d to your computer and use it in GitHub Desktop.

Select an option

Save rebolyte/e4c5f9e6a34fa82d1ac3ae6289c5ca5d to your computer and use it in GitHub Desktop.
Actor
export const Done = Symbol("Done");
export type CrashDirective = "resume" | "reset" | "stop" | "escalate";
export type ActorRef<M> = {
send(message: M): void;
stop(): void;
readonly name: string;
readonly signal: AbortSignal;
};
export type Ctx<M> = {
self: ActorRef<M>;
signal: AbortSignal;
spawn: <S2, M2>(config: SpawnConfig<S2, M2>) => ActorRef<M2>;
lookup: <M2>(name: string) => ActorRef<M2>;
};
export type SpawnConfig<S, M> = {
name: string;
initialState: S;
onMessage: (state: S, message: M, ctx: Ctx<M>) => Promise<S | typeof Done>;
onCrash?: (error: unknown, message: M, ctx: Ctx<M>) => CrashDirective;
onDone?: (ctx: Ctx<M>) => void;
};
export type ActorSystem = {
spawn: <S, M>(config: SpawnConfig<S, M>) => ActorRef<M>;
lookup: <M>(name: string) => ActorRef<M>;
stop(): void;
readonly signal: AbortSignal;
};
export function createSystem(): ActorSystem {
const registry = new Map<string, ActorRef<any>>();
const topLevel = new Set<ActorRef<any>>();
const ac = new AbortController();
function lookup<M>(name: string): ActorRef<M> {
const ref = registry.get(name);
if (!ref) throw new Error(`Actor not found: ${name}`);
return ref;
}
function spawnActor<S, M>(
config: SpawnConfig<S, M>,
parent: Set<ActorRef<any>>,
parentSignal: AbortSignal,
onEscalate?: (error: unknown) => void,
): ActorRef<M> {
if (registry.has(config.name)) {
throw new Error(`Actor already exists: ${config.name}`);
}
let state = config.initialState;
let processing = false;
const queue: M[] = [];
const children = new Set<ActorRef<any>>();
const childAc = new AbortController();
// Parent abort cascades to this actor
parentSignal.addEventListener(
"abort",
() => childAc.abort(),
{ once: true },
);
function handleChildEscalation(error: unknown) {
const directive =
config.onCrash?.(error, undefined as any, ctx) ?? "stop";
applyDirective(directive, error);
}
function applyDirective(directive: CrashDirective, error: unknown) {
switch (directive) {
case "resume":
break;
case "reset":
state = config.initialState;
break;
case "stop":
self.stop();
break;
case "escalate":
if (onEscalate) {
onEscalate(error);
} else {
console.error(
`Unhandled escalation in actor "${config.name}":`,
error,
);
self.stop();
}
break;
}
}
const self: ActorRef<M> = {
name: config.name,
signal: childAc.signal,
send(message: M) {
if (childAc.signal.aborted) return; // dead letter
queue.push(message);
drain();
},
stop() {
if (childAc.signal.aborted) return;
childAc.abort(); // cascades to children via their listener
queue.length = 0;
registry.delete(config.name);
parent.delete(self);
},
};
const ctx: Ctx<M> = {
self,
signal: childAc.signal,
spawn: <S2, M2>(childConfig: SpawnConfig<S2, M2>) =>
spawnActor(childConfig, children, childAc.signal, handleChildEscalation),
lookup,
};
async function drain() {
if (processing || childAc.signal.aborted) return;
processing = true;
while (queue.length > 0 && !childAc.signal.aborted) {
const msg = queue.shift()!;
try {
const result = await config.onMessage(state, msg, ctx);
if (result === Done) {
config.onDone?.(ctx);
self.stop();
return;
}
state = result as S;
} catch (error) {
const directive = config.onCrash?.(error, msg, ctx) ?? "stop";
applyDirective(directive, error);
if (childAc.signal.aborted) return;
}
}
processing = false;
}
registry.set(config.name, self);
parent.add(self);
return self;
}
return {
signal: ac.signal,
spawn: <S, M>(config: SpawnConfig<S, M>) => {
if (ac.signal.aborted) throw new Error("System is stopped");
return spawnActor(config, topLevel, ac.signal);
},
lookup,
stop() {
ac.abort(); // cascades through entire tree
},
};
}
// -- ask --
export type AskOptions = {
timeoutMs?: number;
signal?: AbortSignal;
};
export function ask<M, R>(
system: ActorSystem,
target: ActorRef<M>,
factory: (replyTo: ActorRef<R>) => M,
opts: AskOptions = {},
): Promise<R> {
return new Promise((resolve, reject) => {
const timeout = AbortSignal.timeout(opts.timeoutMs ?? 5000);
const signals = [timeout, system.signal];
if (opts.signal) signals.push(opts.signal);
const combined = AbortSignal.any(signals);
const ref = system.spawn<undefined, R>({
name: `_ask_${crypto.randomUUID()}`,
initialState: undefined,
onMessage: async (_state, reply) => {
resolve(reply);
return Done;
},
});
combined.addEventListener(
"abort",
() => {
ref.stop();
const reason = combined.reason;
if (reason instanceof DOMException && reason.name === "TimeoutError") {
reject(new Error("Ask timed out"));
} else {
reject(new Error("Ask aborted"));
}
},
{ once: true },
);
target.send(factory(ref));
});
}
// -- fromCallback --
type CallbackAPI<TSend, TReceive = TSend> = {
send: (msg: TSend) => void;
receive: (handler: (msg: TReceive) => void) => void;
};
type CallbackSetup<TSend, TReceive = TSend> = (
api: CallbackAPI<TSend, TReceive>,
signal: AbortSignal,
) => void;
export function fromCallback<TSend, TReceive = TSend>(
system: ActorSystem,
name: string,
setup: CallbackSetup<TSend, TReceive>,
): ActorRef<TReceive> {
let listener: ((msg: TReceive) => void) | null = null;
const ref = system.spawn<null, TReceive | typeof Done>({
name,
initialState: null,
onMessage: async (_state, msg) => {
if (msg === Done) return Done;
listener?.(msg as TReceive);
return null;
},
});
setup(
{
send: (msg: TSend) => ref.send(msg as any),
receive: (handler) => {
listener = handler;
},
},
ref.signal,
);
return ref as ActorRef<TReceive>;
}
// -- fromPromise --
type PromiseResult<T> =
| { type: "resolved"; value: T }
| { type: "rejected"; error: unknown };
export function fromPromise<T>(
system: ActorSystem,
name: string,
work: (signal: AbortSignal) => Promise<T>,
target: ActorRef<PromiseResult<T>>,
): ActorRef<never> {
const ref = system.spawn<null, never>({
name,
initialState: null,
onMessage: async () => null,
});
work(ref.signal).then(
(value) => {
if (!ref.signal.aborted) {
target.send({ type: "resolved", value });
}
ref.stop();
},
(error) => {
if (!ref.signal.aborted) {
target.send({ type: "rejected", error });
}
ref.stop();
},
);
return ref;
}
// -- fromObservable --
type Subscribable<T> = {
subscribe(
next: (value: T) => void,
error?: (err: unknown) => void,
complete?: () => void,
): { unsubscribe(): void };
};
export function fromObservable<T>(
system: ActorSystem,
name: string,
observable: Subscribable<T>,
target: ActorRef<T>,
): ActorRef<typeof Done> {
const ref = system.spawn<null, typeof Done>({
name,
initialState: null,
onMessage: async (_state, msg) => {
if (msg === Done) return Done;
return null;
},
});
const sub = observable.subscribe(
(value) => target.send(value),
(err) => {
console.error(`Observable error in "${name}":`, err);
ref.stop();
},
() => ref.stop(),
);
// Actor death → unsubscribe. One line replaces onDone + mutable tracking.
ref.signal.addEventListener("abort", () => sub.unsubscribe(), { once: true });
return ref as ActorRef<typeof Done>;
}
export const Done = Symbol("Done");
export type CrashDirective = "resume" | "reset" | "stop" | "escalate";
export type ActorRef<M> = {
send(message: M): void;
stop(): void;
readonly name: string;
};
export type Ctx<M> = {
self: ActorRef<M>;
spawn: <S2, M2>(config: SpawnConfig<S2, M2>) => ActorRef<M2>;
lookup: <M2>(name: string) => ActorRef<M2>;
};
export type SpawnConfig<S, M> = {
name: string;
initialState: S;
onMessage: (state: S, message: M, ctx: Ctx<M>) => Promise<S | typeof Done>;
onCrash?: (error: unknown, message: M, ctx: Ctx<M>) => CrashDirective;
onDone?: (ctx: Ctx<M>) => void;
};
export type ActorSystem = {
spawn: <S, M>(config: SpawnConfig<S, M>) => ActorRef<M>;
lookup: <M>(name: string) => ActorRef<M>;
stop(): void;
};
export function createSystem(): ActorSystem {
const registry = new Map<string, ActorRef<any>>();
const topLevel = new Set<ActorRef<any>>();
let stopped = false;
function lookup<M>(name: string): ActorRef<M> {
const ref = registry.get(name);
if (!ref) throw new Error(`Actor not found: ${name}`);
return ref;
}
function spawnActor<S, M>(
config: SpawnConfig<S, M>,
parent: Set<ActorRef<any>>,
onEscalate?: (error: unknown) => void,
): ActorRef<M> {
if (registry.has(config.name)) {
throw new Error(`Actor already exists: ${config.name}`);
}
let state = config.initialState;
let processing = false;
let alive = true;
const queue: M[] = [];
const children = new Set<ActorRef<any>>();
function handleChildEscalation(error: unknown) {
// child escalated to us — run our own onCrash as if we crashed
const directive = config.onCrash?.(error, undefined as any, ctx) ?? "stop";
applyDirective(directive, error);
}
function applyDirective(directive: CrashDirective, error: unknown) {
switch (directive) {
case "resume":
// keep state, keep going — drop the bad message
break;
case "reset":
// restart with initial state, keep children, keep mailbox
state = config.initialState;
break;
case "stop":
self.stop();
break;
case "escalate":
if (onEscalate) {
onEscalate(error);
} else {
// top-level with no handler — stop
console.error(`Unhandled escalation in actor "${config.name}":`, error);
self.stop();
}
break;
}
}
const self: ActorRef<M> = {
name: config.name,
send(message: M) {
if (!alive) return; // dead letter
queue.push(message);
drain();
},
stop() {
if (!alive) return;
alive = false;
for (const child of children) child.stop();
children.clear();
queue.length = 0;
registry.delete(config.name);
parent.delete(self);
},
};
const ctx: Ctx<M> = {
self,
spawn: <S2, M2>(childConfig: SpawnConfig<S2, M2>) =>
spawnActor(childConfig, children, handleChildEscalation),
lookup,
};
async function drain() {
if (processing || !alive) return;
processing = true;
while (queue.length > 0 && alive) {
const msg = queue.shift()!;
try {
const result = await config.onMessage(state, msg, ctx);
if (result === Done) {
config.onDone?.(ctx);
self.stop();
return;
}
state = result as S;
} catch (error) {
const directive = config.onCrash?.(error, msg, ctx) ?? "stop";
applyDirective(directive, error);
if (!alive) return;
}
}
processing = false;
}
registry.set(config.name, self);
parent.add(self);
return self;
}
return {
spawn: <S, M>(config: SpawnConfig<S, M>) => {
if (stopped) throw new Error("System is stopped");
return spawnActor(config, topLevel);
},
lookup,
stop() {
stopped = true;
for (const ref of topLevel) ref.stop();
topLevel.clear();
},
};
}
// ask: send a message expecting a reply, using a temp actor
export function ask<M, R>(
system: ActorSystem,
target: ActorRef<M>,
factory: (replyTo: ActorRef<R>) => M,
timeoutMs = 5000,
): Promise<R> {
return new Promise((resolve, reject) => {
const timer = setTimeout(() => {
ref.stop();
reject(new Error("Ask timed out"));
}, timeoutMs);
const ref = system.spawn<undefined, R>({
name: `_ask_${crypto.randomUUID()}`,
initialState: undefined,
onMessage: async (_state, reply) => {
clearTimeout(timer);
resolve(reply);
return Done;
},
});
target.send(factory(ref));
});
}
// -- fromCallback --
// You get a `send` function to push messages in from callback-land.
// Return a cleanup function (removeListener, clearInterval, etc).
type CallbackAPI<TSend, TReceive = TSend> = {
send: (msg: TSend) => void;
receive: (handler: (msg: TReceive) => void) => void;
};
type CallbackSetup<TSend, TReceive = TSend> = (
api: CallbackAPI<TSend, TReceive>,
) => (() => void) | void;
export function fromCallback<TSend, TReceive = TSend>(
system: ActorSystem,
name: string,
setup: CallbackSetup<TSend, TReceive>,
): ActorRef<TReceive> {
let cleanup: (() => void) | void;
let listener: ((msg: TReceive) => void) | null = null;
const ref = system.spawn<null, TReceive | typeof Done>({
name,
initialState: null,
onMessage: async (_state, msg) => {
if (msg === Done) {
cleanup?.();
return Done;
}
listener?.(msg as TReceive);
return null;
},
});
cleanup = setup({
send: (msg: TSend) => ref.send(msg as any),
receive: (handler) => { listener = handler; },
});
return ref as ActorRef<TReceive>;
}
// -- fromPromise --
// Spawns an actor that resolves a promise, then sends result to `target`.
// Actor self-terminates after. Rejects go to onError or kill the actor.
type PromiseResult<T> = { type: "resolved"; value: T } | { type: "rejected"; error: unknown };
export function fromPromise<T>(
system: ActorSystem,
name: string,
work: () => Promise<T>,
target: ActorRef<PromiseResult<T>>,
): ActorRef<never> {
const ref = system.spawn<null, never>({
name,
initialState: null,
onMessage: async () => null, // no inbound messages expected
});
work().then(
(value) => {
target.send({ type: "resolved", value });
ref.stop();
},
(error) => {
target.send({ type: "rejected", error });
ref.stop();
},
);
return ref;
}
// -- fromObservable --
// Subscribes to anything with a `subscribe(next, error?, complete?)` shape.
// Forwards emissions as messages to `target`. Cleans up on stop.
type Subscribable<T> = {
subscribe(
next: (value: T) => void,
error?: (err: unknown) => void,
complete?: () => void,
): { unsubscribe(): void };
};
export function fromObservable<T>(
system: ActorSystem,
name: string,
observable: Subscribable<T>,
target: ActorRef<T>,
): ActorRef<typeof Done> {
let sub: { unsubscribe(): void } | null = null;
const ref = system.spawn<null, typeof Done>({
name,
initialState: null,
onMessage: async (_state, msg) => {
if (msg === Done) {
sub?.unsubscribe();
return Done;
}
return null;
},
onDone: () => sub?.unsubscribe(),
});
sub = observable.subscribe(
(value) => target.send(value),
(err) => {
console.error(`Observable error in "${name}":`, err);
ref.stop();
},
() => ref.stop(),
);
return ref as ActorRef<typeof Done>;
}
import {
createSystem,
ask,
Done,
fromCallback,
fromPromise,
fromObservable,
type ActorRef,
} from "./actors";
const system = createSystem();
// -- Example: ping-pong with a counter that stops after 3 --
type PingMsg = { type: "ping"; replyTo: ActorRef<PongMsg> };
type PongMsg = { type: "pong"; count: number };
const ponger = system.spawn<null, PingMsg>({
name: "ponger",
initialState: null,
onMessage: async (_state, msg) => {
msg.replyTo.send({ type: "pong", count: 1 });
return null;
},
});
const pinger = system.spawn<number, PongMsg>({
name: "pinger",
initialState: 0,
onMessage: async (count, _msg, ctx) => {
const next = count + 1;
console.log(`pinger: received pong #${next}`);
if (next >= 3) return Done;
ponger.send({ type: "ping", replyTo: ctx.self });
return next;
},
onDone: () => console.log("pinger: done after 3 pongs"),
});
// kick it off
ponger.send({ type: "ping", replyTo: pinger });
// -- Example: ask pattern (now with signal composition) --
type MathMsg = { op: "add"; a: number; b: number; replyTo: ActorRef<number> };
const calculator = system.spawn<null, MathMsg>({
name: "calculator",
initialState: null,
onMessage: async (_state, msg) => {
msg.replyTo.send(msg.a + msg.b);
return null;
},
});
// Basic ask — timeout via options object
const result = await ask<MathMsg, number>(system, calculator, (replyTo) => ({
op: "add",
a: 17,
b: 25,
replyTo,
}), { timeoutMs: 3000 });
console.log(`ask result: 17 + 25 = ${result}`);
// Ask with external cancellation — e.g. user navigated away
const userAc = new AbortController();
try {
const result2 = await ask<MathMsg, number>(system, calculator, (replyTo) => ({
op: "add",
a: 1,
b: 2,
replyTo,
}), { timeoutMs: 5000, signal: userAc.signal });
console.log(`ask result: 1 + 2 = ${result2}`);
} catch (e) {
console.log("ask was cancelled or timed out:", (e as Error).message);
}
// -- Example: parent-child hierarchy (observe lifecycle via signal) --
const parent = system.spawn<null, string>({
name: "parent",
initialState: null,
onMessage: async (_state, msg, ctx) => {
if (msg === "spawn-child") {
const child = ctx.spawn({
name: "child",
initialState: 0,
onMessage: async (n, _msg) => {
console.log(`child: message #${n + 1}`);
return n + 1;
},
});
// External code can observe child lifecycle via signal
child.signal.addEventListener("abort", () => {
console.log("child was stopped (observed via signal)");
}, { once: true });
}
return null;
},
});
parent.send("spawn-child");
// stopping parent cascades via AbortController — no manual children.clear()
setTimeout(() => {
parent.stop();
console.log("parent stopped (child stopped with it)");
}, 100);
// -- Example: onMessage using ctx.signal for cancellable work --
const fetcher = system.spawn<null, { url: string }>({
name: "fetcher",
initialState: null,
onMessage: async (_state, msg, ctx) => {
// If this actor is stopped mid-fetch, the request is aborted
const res = await fetch(msg.url, { signal: ctx.signal });
console.log(`fetched ${msg.url}: ${res.status}`);
return null;
},
onCrash: (error) => {
if (error instanceof DOMException && error.name === "AbortError") {
return "stop"; // expected — actor was killed, no drama
}
return "resume"; // transient fetch error, try next message
},
});
// -- fromCallback: wrap setInterval (signal replaces return cleanup) --
const ticker = fromCallback<number>(system, "ticker", ({ send }, signal) => {
let i = 0;
const id = setInterval(() => send(++i), 1000);
// signal handles cleanup — no return function needed
signal.addEventListener("abort", () => clearInterval(id), { once: true });
});
setTimeout(() => ticker.stop(), 3500);
// -- fromCallback: bidirectional WebSocket (signal for all listeners) --
type WsOut = { type: "send"; data: string };
type WsIn = { type: "message"; data: string };
const wsActor = fromCallback<WsIn, WsOut>(system, "ws", ({ send, receive }, signal) => {
const socket = new WebSocket("ws://localhost:3000");
// All listeners auto-removed when actor stops
socket.addEventListener("open", () => {
console.log("ws connected");
}, { signal });
socket.addEventListener("message", (e) => {
send({ type: "message", data: e.data });
}, { signal });
socket.addEventListener("error", (e) => {
console.error("ws error:", e);
}, { signal });
// Outbound: actor system → socket
receive((msg) => {
if (msg.type === "send") socket.send(msg.data);
});
// Close socket on actor teardown
signal.addEventListener("abort", () => socket.close(), { once: true });
});
wsActor.send({ type: "send", data: "hello server" });
// -- fromPromise: one-shot async work (signal enables cancellation) --
const logger = system.spawn<null, any>({
name: "logger",
initialState: null,
onMessage: async (_s, msg) => {
console.log("logger received:", msg);
return null;
},
});
fromPromise(
system,
"fetch-user",
async (signal) => {
// signal lets us abort if the actor is killed mid-flight
const res = await fetch("https://api.example.com/user/1", { signal });
return res.json();
},
logger,
);
// logger receives: { type: "resolved", value: { id: 1, name: "James" } }
// -- fromObservable: any Subscribable shape --
const fakeObservable = {
subscribe(
next: (v: string) => void,
_error?: (e: unknown) => void,
complete?: () => void,
) {
next("hello");
next("world");
setTimeout(() => {
next("async one");
complete?.();
}, 100);
return { unsubscribe() {} };
},
};
fromObservable(system, "words", fakeObservable, logger);
// logger receives: "hello", "world", "async one"
// -- Graceful shutdown: one abort tears down everything --
process.on("SIGTERM", () => {
console.log("SIGTERM received, shutting down...");
system.stop(); // one call → entire actor tree aborts
});
// You can also observe system-level shutdown from anywhere
system.signal.addEventListener("abort", () => {
console.log("system shutdown complete");
}, { once: true });
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment