Last active February 25, 2025 13:48
Simple actors with async and TransformStream
import { type Receive, type Send, useMailbox } from "./mailbox.ts";
export type Context<Msg> = {
self: string;
send: (id: string, msg: Msg) => Promise<void>;
receive: Receive<Msg>;
spawn: (id: string, actor: Actor<Msg>) => string;
export type Actor<Msg> = (context: Context<Msg>) => Promise<void>;
export const uuid = () => crypto.randomUUID();
export type Runtime<Msg> = {
send: (id: string, msg: Msg) => Promise<void>;
spawn: (id: string, actor: Actor<Msg>) => string;
export const createRuntime = <Msg>(): Runtime<Msg> => {
const addresses: Map<string, Send<Msg>> = new Map();
const send = async (id: string, msg: Msg): Promise<void> => {
const address = addresses.get(id);
if (address == null) {
"Message sent to actor address that does not exist.",
console.log("runtime.send", id, msg);
await address(msg);
const spawn = (
id: string,
actor: Actor<Msg>,
): string => {
const [mailbox, address] = useMailbox<Msg>();
addresses.set(id, address);
self: id,
receive: mailbox,
return id;
return { send, spawn };
export type Send<Msg> = (msg: Msg) => Promise<void>;
export type Receive<Msg> = () => Promise<Msg | undefined>;
* Create a mailbox - a message queue that you can send messages to,
* or recieve messages from cooperatively.
export const useMailbox = <Msg>(
maxQueueSize = 1024,
): [Receive<Msg>, Send<Msg>] => {
const { readable, writable } = new TransformStream<Msg>(undefined, {
highWaterMark: maxQueueSize,
const writer = (writable as WritableStream<Msg>).getWriter();
const reader = (readable as ReadableStream<Msg>).getReader();
const send = async (msg: Msg): Promise<void> => {
return await writer.write(msg);
const receive = async (): Promise<Msg | undefined> => {
const { value } = await;
return value;
return [receive, send];
import { type Context, createRuntime, uuid } from "./actor.ts";
type Ping = { type: "ping"; replyTo: string };
type Pong = { type: "pong"; replyTo: string };
type Msg = Ping | Pong;
const ping = async (ctx: Context<Msg>): Promise<void> => {
const msg = await ctx.receive();
await ctx.send(msg.replyTo, { type: "ping", replyTo: ctx.self });
return ping(ctx);
const pong = async (ctx: Context<Msg>): Promise<void> => {
const msg = await ctx.receive();
await ctx.send(msg.replyTo, { type: "pong", replyTo: ctx.self });
return pong(ctx);
const main = async (
{ send, spawn }: Context<Msg>,
): Promise<void> => {
const pingId = spawn(uuid(), ping);
const pongId = spawn(uuid(), pong);
await send(pingId, {
type: "pong",
replyTo: pongId,
const runtime = createRuntime<Msg>();
runtime.spawn(uuid(), main);
