Skip to content

Instantly share code, notes, and snippets.

@gordonbrander
Last active February 25, 2025 13:48
Show Gist options
  • Save gordonbrander/eef2cebd6960114b80201f64b4960b73 to your computer and use it in GitHub Desktop.
Save gordonbrander/eef2cebd6960114b80201f64b4960b73 to your computer and use it in GitHub Desktop.
Simple actors with async and TransformStream
import { type Receive, type Send, useMailbox } from "./mailbox.ts";
export type Context<Msg> = {
self: string;
send: (id: string, msg: Msg) => Promise<void>;
receive: Receive<Msg>;
spawn: (id: string, actor: Actor<Msg>) => string;
};
export type Actor<Msg> = (context: Context<Msg>) => Promise<void>;
export const uuid = () => crypto.randomUUID();
export type Runtime<Msg> = {
send: (id: string, msg: Msg) => Promise<void>;
spawn: (id: string, actor: Actor<Msg>) => string;
};
export const createRuntime = <Msg>(): Runtime<Msg> => {
const addresses: Map<string, Send<Msg>> = new Map();
const send = async (id: string, msg: Msg): Promise<void> => {
const address = addresses.get(id);
if (address == null) {
console.warn(
"Message sent to actor address that does not exist.",
id,
msg,
);
return;
}
console.log("runtime.send", id, msg);
await address(msg);
};
const spawn = (
id: string,
actor: Actor<Msg>,
): string => {
const [mailbox, address] = useMailbox<Msg>();
addresses.set(id, address);
actor({
self: id,
send,
receive: mailbox,
spawn,
});
return id;
};
return { send, spawn };
};
export type Send<Msg> = (msg: Msg) => Promise<void>;
export type Receive<Msg> = () => Promise<Msg | undefined>;
/**
* Create a mailbox - a message queue that you can send messages to,
* or recieve messages from cooperatively.
*/
export const useMailbox = <Msg>(
maxQueueSize = 1024,
): [Receive<Msg>, Send<Msg>] => {
const { readable, writable } = new TransformStream<Msg>(undefined, {
highWaterMark: maxQueueSize,
});
const writer = (writable as WritableStream<Msg>).getWriter();
const reader = (readable as ReadableStream<Msg>).getReader();
const send = async (msg: Msg): Promise<void> => {
return await writer.write(msg);
};
const receive = async (): Promise<Msg | undefined> => {
const { value } = await reader.read();
return value;
};
return [receive, send];
};
import { type Context, createRuntime, uuid } from "./actor.ts";
type Ping = { type: "ping"; replyTo: string };
type Pong = { type: "pong"; replyTo: string };
type Msg = Ping | Pong;
const ping = async (ctx: Context<Msg>): Promise<void> => {
const msg = await ctx.receive();
await ctx.send(msg.replyTo, { type: "ping", replyTo: ctx.self });
return ping(ctx);
};
const pong = async (ctx: Context<Msg>): Promise<void> => {
const msg = await ctx.receive();
await ctx.send(msg.replyTo, { type: "pong", replyTo: ctx.self });
return pong(ctx);
};
const main = async (
{ send, spawn }: Context<Msg>,
): Promise<void> => {
const pingId = spawn(uuid(), ping);
const pongId = spawn(uuid(), pong);
await send(pingId, {
type: "pong",
replyTo: pongId,
});
};
const runtime = createRuntime<Msg>();
runtime.spawn(uuid(), main);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment