Last active
February 25, 2025 13:48
-
-
Save gordonbrander/eef2cebd6960114b80201f64b4960b73 to your computer and use it in GitHub Desktop.
Simple actors with async and TransformStream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { type Receive, type Send, useMailbox } from "./mailbox.ts"; | |
export type Context<Msg> = { | |
self: string; | |
send: (id: string, msg: Msg) => Promise<void>; | |
receive: Receive<Msg>; | |
spawn: (id: string, actor: Actor<Msg>) => string; | |
}; | |
export type Actor<Msg> = (context: Context<Msg>) => Promise<void>; | |
export const uuid = () => crypto.randomUUID(); | |
export type Runtime<Msg> = { | |
send: (id: string, msg: Msg) => Promise<void>; | |
spawn: (id: string, actor: Actor<Msg>) => string; | |
}; | |
export const createRuntime = <Msg>(): Runtime<Msg> => { | |
const addresses: Map<string, Send<Msg>> = new Map(); | |
const send = async (id: string, msg: Msg): Promise<void> => { | |
const address = addresses.get(id); | |
if (address == null) { | |
console.warn( | |
"Message sent to actor address that does not exist.", | |
id, | |
msg, | |
); | |
return; | |
} | |
console.log("runtime.send", id, msg); | |
await address(msg); | |
}; | |
const spawn = ( | |
id: string, | |
actor: Actor<Msg>, | |
): string => { | |
const [mailbox, address] = useMailbox<Msg>(); | |
addresses.set(id, address); | |
actor({ | |
self: id, | |
send, | |
receive: mailbox, | |
spawn, | |
}); | |
return id; | |
}; | |
return { send, spawn }; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export type Send<Msg> = (msg: Msg) => Promise<void>; | |
export type Receive<Msg> = () => Promise<Msg | undefined>; | |
/** | |
* Create a mailbox - a message queue that you can send messages to, | |
* or recieve messages from cooperatively. | |
*/ | |
export const useMailbox = <Msg>( | |
maxQueueSize = 1024, | |
): [Receive<Msg>, Send<Msg>] => { | |
const { readable, writable } = new TransformStream<Msg>(undefined, { | |
highWaterMark: maxQueueSize, | |
}); | |
const writer = (writable as WritableStream<Msg>).getWriter(); | |
const reader = (readable as ReadableStream<Msg>).getReader(); | |
const send = async (msg: Msg): Promise<void> => { | |
return await writer.write(msg); | |
}; | |
const receive = async (): Promise<Msg | undefined> => { | |
const { value } = await reader.read(); | |
return value; | |
}; | |
return [receive, send]; | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { type Context, createRuntime, uuid } from "./actor.ts"; | |
type Ping = { type: "ping"; replyTo: string }; | |
type Pong = { type: "pong"; replyTo: string }; | |
type Msg = Ping | Pong; | |
const ping = async (ctx: Context<Msg>): Promise<void> => { | |
const msg = await ctx.receive(); | |
await ctx.send(msg.replyTo, { type: "ping", replyTo: ctx.self }); | |
return ping(ctx); | |
}; | |
const pong = async (ctx: Context<Msg>): Promise<void> => { | |
const msg = await ctx.receive(); | |
await ctx.send(msg.replyTo, { type: "pong", replyTo: ctx.self }); | |
return pong(ctx); | |
}; | |
const main = async ( | |
{ send, spawn }: Context<Msg>, | |
): Promise<void> => { | |
const pingId = spawn(uuid(), ping); | |
const pongId = spawn(uuid(), pong); | |
await send(pingId, { | |
type: "pong", | |
replyTo: pongId, | |
}); | |
}; | |
const runtime = createRuntime<Msg>(); | |
runtime.spawn(uuid(), main); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment