Last active
November 9, 2025 22:36
-
-
Save shovon/3901314e2b24730531a3279019e58cf7 to your computer and use it in GitHub Desktop.
An actor model implementation in TypeScript
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| type WithStringType = { type: string }; | |
| type Unsubscribe = () => void; | |
| type MachineEvent = WithStringType; | |
| type MachineState = WithStringType; | |
| type IdleState = { type: "IDLE" }; | |
| type StoppedState = { type: "STOPPED" }; | |
| type ActorState<TState extends WithStringType> = | |
| | IdleState | |
| | StoppedState | |
| | TState; | |
| export type Actor<TEvent extends MachineEvent, TState extends MachineState> = { | |
| /** | |
| * Sends an event to the actor, which will have it process an event. | |
| * | |
| * May or may not update the actor's state immediately, if at all. | |
| * @param event The event to send to the actor. | |
| */ | |
| send(event: TEvent): void; | |
| /** | |
| * Add an event listener to the event when the internal state of an actor | |
| * updates. | |
| * @param listener A callback function to be invoked when the internal state | |
| * updates. | |
| * @param option Additional options for the subscription. | |
| */ | |
| subscribe( | |
| listener: (state: TState) => void, | |
| option?: { immediate?: boolean } | |
| ): Unsubscribe; | |
| /** | |
| * Get a snapshot of the state of the actor. | |
| */ | |
| getSnapshot(): ActorState<TState>; | |
| /** | |
| * Stop the actor entirely; prevent it from processing any more events. | |
| */ | |
| stop(): void; | |
| }; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { createJobActor, type JobActor, type JobRunner } from "./job-actor"; | |
| type JobActorMeta<TInput, TResult> = { | |
| actor: JobActor<TInput, TResult>; | |
| references: Set<object>; | |
| unsubscribe: () => void; | |
| }; | |
| export function createJobPool<TKey, TInput, TResult>( | |
| init: (key: TKey) => JobRunner<TInput, TResult> | |
| ) { | |
| const actorPool = new Map<TKey, JobActorMeta<TInput, TResult>>(); | |
| const referenceToKey = new WeakMap<object, [TKey]>(); | |
| const deleteActor = (key: TKey) => { | |
| const actorMeta = actorPool.get(key); | |
| if (!actorMeta) return; | |
| actorMeta.unsubscribe(); | |
| actorPool.delete(key); | |
| }; | |
| const newActorMeta = (key: TKey) => { | |
| const actor = createJobActor(init(key)); | |
| const unsubscribe = actor.subscribe(() => { | |
| clearUnused(key); | |
| }); | |
| const actorMeta = { actor, references: new Set<object>(), unsubscribe }; | |
| actorPool.set(key, actorMeta); | |
| return actorMeta; | |
| }; | |
| const replaceActorMeta = (key: TKey) => { | |
| deleteActor(key); | |
| return newActorMeta(key); | |
| }; | |
| const clearUnused = (key: TKey) => { | |
| const actorMeta = actorPool.get(key); | |
| if (!actorMeta) return; | |
| switch (actorMeta.actor.getSnapshot().type) { | |
| case "DONE": | |
| case "IDLE": | |
| case "CANCELED": | |
| case "FAILED": | |
| if (actorMeta.references.size <= 0) { | |
| deleteActor(key); | |
| } | |
| break; | |
| case "STOPPED": | |
| deleteActor(key); | |
| break; | |
| } | |
| }; | |
| return { | |
| acquire: (key: TKey): [JobActor<TInput, TResult>, object] => { | |
| let actorMeta: JobActorMeta<TInput, TResult>; | |
| if (!actorPool.has(key)) { | |
| actorMeta = newActorMeta(key); | |
| } | |
| actorMeta = actorPool.get(key)!; | |
| if (actorMeta.actor.getSnapshot().type === "STOPPED") { | |
| actorMeta = replaceActorMeta(key); | |
| } | |
| const reference = {}; | |
| actorMeta.references.add(reference); | |
| referenceToKey.set(reference, [key]); | |
| return [actorMeta.actor, reference]; | |
| }, | |
| free: (reference: object) => { | |
| const keyTuple = referenceToKey.get(reference); | |
| referenceToKey.delete(reference); | |
| if (!keyTuple || keyTuple.length <= 0) return; | |
| const [key] = keyTuple; | |
| const actorMeta = actorPool.get(key); | |
| if (!actorMeta) return; | |
| actorMeta.references.delete(reference); | |
| clearUnused(key); | |
| }, | |
| }; | |
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import { gate, subject } from "../../utils"; | |
| import type { Actor } from "./actor"; | |
| type JobRunnerEvent<TInput> = | |
| | { | |
| type: "RUN_JOB"; | |
| input: TInput; | |
| } | |
| | { | |
| type: "CANCEL_RUNNING_JOB"; | |
| }; | |
| type JobRunnerState<TResult> = | |
| | { | |
| type: "IDLE"; | |
| } | |
| | { | |
| type: "RUNNING"; | |
| } | |
| | { | |
| type: "DONE"; | |
| result: TResult; | |
| } | |
| | { | |
| type: "STOPPED"; | |
| } | |
| | { | |
| type: "CANCELED"; | |
| } | |
| | { | |
| type: "FAILED"; | |
| error: unknown; | |
| }; | |
| export type JobActor<TInput, TResult> = Actor< | |
| JobRunnerEvent<TInput>, | |
| JobRunnerState<TResult> | |
| >; | |
| export type JobRunner<TInput, TResult> = ( | |
| input: TInput, | |
| onStop: ReturnType<typeof gate>["listen"] | |
| ) => Promise<TResult>; | |
| export const createJobActor = <TInput, TResult>( | |
| fn: JobRunner<TInput, TResult> | |
| ): JobActor<TInput, TResult> => { | |
| type LocalState = JobRunnerState<TResult>; | |
| const jobSubject = subject<LocalState>(); | |
| let currentState: LocalState = { type: "IDLE" }; | |
| let runningJobStopper: (() => void) | null = null; | |
| const stopRunningJob = () => { | |
| runningJobStopper?.(); | |
| runningJobStopper = null; | |
| }; | |
| function setState(state: JobRunnerState<TResult>) { | |
| currentState = state; | |
| jobSubject.next(state); | |
| } | |
| function runJob(input: TInput) { | |
| const stopGate = gate(); | |
| runningJobStopper = stopGate.open; | |
| const promise = (async () => { | |
| return fn(input, stopGate.listen); | |
| })(); | |
| const runningState = { type: "RUNNING" } satisfies LocalState; | |
| setState(runningState); | |
| const clearRunningJobStopper = () => { | |
| runningJobStopper = null; | |
| }; | |
| promise.then( | |
| (result) => { | |
| clearRunningJobStopper(); | |
| if (currentState.type === "RUNNING") { | |
| setState({ type: "DONE", result }); | |
| } | |
| }, | |
| (error) => { | |
| clearRunningJobStopper(); | |
| if (currentState.type === "RUNNING") { | |
| setState({ type: "FAILED", error }); | |
| } | |
| } | |
| ); | |
| } | |
| return { | |
| send: (event) => { | |
| if (currentState.type === "STOPPED") { | |
| console.error("The actor has already been stopped"); | |
| return; | |
| } | |
| switch (event.type) { | |
| case "RUN_JOB": | |
| if (currentState.type !== "RUNNING") { | |
| runJob(event.input); | |
| } | |
| break; | |
| case "CANCEL_RUNNING_JOB": | |
| if (currentState.type === "RUNNING") { | |
| setState({ type: "CANCELED" }); | |
| stopRunningJob(); | |
| } | |
| break; | |
| } | |
| }, | |
| subscribe: (listener, option) => { | |
| if (option?.immediate && currentState.type === "IDLE") { | |
| listener(currentState); | |
| } | |
| return jobSubject.listen(listener, option?.immediate ?? false); | |
| }, | |
| getSnapshot: () => currentState, | |
| stop: () => { | |
| setState({ type: "STOPPED" }); | |
| stopRunningJob(); | |
| }, | |
| }; | |
| }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment