Skip to content

Instantly share code, notes, and snippets.

@shovon
Last active November 9, 2025 22:36
Show Gist options
  • Select an option

  • Save shovon/3901314e2b24730531a3279019e58cf7 to your computer and use it in GitHub Desktop.

Select an option

Save shovon/3901314e2b24730531a3279019e58cf7 to your computer and use it in GitHub Desktop.
An actor model implementation in TypeScript
type WithStringType = { type: string };
type Unsubscribe = () => void;
type MachineEvent = WithStringType;
type MachineState = WithStringType;
type IdleState = { type: "IDLE" };
type StoppedState = { type: "STOPPED" };
type ActorState<TState extends WithStringType> =
| IdleState
| StoppedState
| TState;
export type Actor<TEvent extends MachineEvent, TState extends MachineState> = {
/**
* Sends an event to the actor, which will have it process an event.
*
* May or may not update the actor's state immediately, if at all.
* @param event The event to send to the actor.
*/
send(event: TEvent): void;
/**
* Add an event listener to the event when the internal state of an actor
* updates.
* @param listener A callback function to be invoked when the internal state
* updates.
* @param option Additional options for the subscription.
*/
subscribe(
listener: (state: TState) => void,
option?: { immediate?: boolean }
): Unsubscribe;
/**
* Get a snapshot of the state of the actor.
*/
getSnapshot(): ActorState<TState>;
/**
* Stop the actor entirely; prevent it from processing any more events.
*/
stop(): void;
};
import { createJobActor, type JobActor, type JobRunner } from "./job-actor";
type JobActorMeta<TInput, TResult> = {
actor: JobActor<TInput, TResult>;
references: Set<object>;
unsubscribe: () => void;
};
export function createJobPool<TKey, TInput, TResult>(
init: (key: TKey) => JobRunner<TInput, TResult>
) {
const actorPool = new Map<TKey, JobActorMeta<TInput, TResult>>();
const referenceToKey = new WeakMap<object, [TKey]>();
const deleteActor = (key: TKey) => {
const actorMeta = actorPool.get(key);
if (!actorMeta) return;
actorMeta.unsubscribe();
actorPool.delete(key);
};
const newActorMeta = (key: TKey) => {
const actor = createJobActor(init(key));
const unsubscribe = actor.subscribe(() => {
clearUnused(key);
});
const actorMeta = { actor, references: new Set<object>(), unsubscribe };
actorPool.set(key, actorMeta);
return actorMeta;
};
const replaceActorMeta = (key: TKey) => {
deleteActor(key);
return newActorMeta(key);
};
const clearUnused = (key: TKey) => {
const actorMeta = actorPool.get(key);
if (!actorMeta) return;
switch (actorMeta.actor.getSnapshot().type) {
case "DONE":
case "IDLE":
case "CANCELED":
case "FAILED":
if (actorMeta.references.size <= 0) {
deleteActor(key);
}
break;
case "STOPPED":
deleteActor(key);
break;
}
};
return {
acquire: (key: TKey): [JobActor<TInput, TResult>, object] => {
let actorMeta: JobActorMeta<TInput, TResult>;
if (!actorPool.has(key)) {
actorMeta = newActorMeta(key);
}
actorMeta = actorPool.get(key)!;
if (actorMeta.actor.getSnapshot().type === "STOPPED") {
actorMeta = replaceActorMeta(key);
}
const reference = {};
actorMeta.references.add(reference);
referenceToKey.set(reference, [key]);
return [actorMeta.actor, reference];
},
free: (reference: object) => {
const keyTuple = referenceToKey.get(reference);
referenceToKey.delete(reference);
if (!keyTuple || keyTuple.length <= 0) return;
const [key] = keyTuple;
const actorMeta = actorPool.get(key);
if (!actorMeta) return;
actorMeta.references.delete(reference);
clearUnused(key);
},
};
}
import { gate, subject } from "../../utils";
import type { Actor } from "./actor";
type JobRunnerEvent<TInput> =
| {
type: "RUN_JOB";
input: TInput;
}
| {
type: "CANCEL_RUNNING_JOB";
};
type JobRunnerState<TResult> =
| {
type: "IDLE";
}
| {
type: "RUNNING";
}
| {
type: "DONE";
result: TResult;
}
| {
type: "STOPPED";
}
| {
type: "CANCELED";
}
| {
type: "FAILED";
error: unknown;
};
export type JobActor<TInput, TResult> = Actor<
JobRunnerEvent<TInput>,
JobRunnerState<TResult>
>;
export type JobRunner<TInput, TResult> = (
input: TInput,
onStop: ReturnType<typeof gate>["listen"]
) => Promise<TResult>;
export const createJobActor = <TInput, TResult>(
fn: JobRunner<TInput, TResult>
): JobActor<TInput, TResult> => {
type LocalState = JobRunnerState<TResult>;
const jobSubject = subject<LocalState>();
let currentState: LocalState = { type: "IDLE" };
let runningJobStopper: (() => void) | null = null;
const stopRunningJob = () => {
runningJobStopper?.();
runningJobStopper = null;
};
function setState(state: JobRunnerState<TResult>) {
currentState = state;
jobSubject.next(state);
}
function runJob(input: TInput) {
const stopGate = gate();
runningJobStopper = stopGate.open;
const promise = (async () => {
return fn(input, stopGate.listen);
})();
const runningState = { type: "RUNNING" } satisfies LocalState;
setState(runningState);
const clearRunningJobStopper = () => {
runningJobStopper = null;
};
promise.then(
(result) => {
clearRunningJobStopper();
if (currentState.type === "RUNNING") {
setState({ type: "DONE", result });
}
},
(error) => {
clearRunningJobStopper();
if (currentState.type === "RUNNING") {
setState({ type: "FAILED", error });
}
}
);
}
return {
send: (event) => {
if (currentState.type === "STOPPED") {
console.error("The actor has already been stopped");
return;
}
switch (event.type) {
case "RUN_JOB":
if (currentState.type !== "RUNNING") {
runJob(event.input);
}
break;
case "CANCEL_RUNNING_JOB":
if (currentState.type === "RUNNING") {
setState({ type: "CANCELED" });
stopRunningJob();
}
break;
}
},
subscribe: (listener, option) => {
if (option?.immediate && currentState.type === "IDLE") {
listener(currentState);
}
return jobSubject.listen(listener, option?.immediate ?? false);
},
getSnapshot: () => currentState,
stop: () => {
setState({ type: "STOPPED" });
stopRunningJob();
},
};
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment