Skip to content

Instantly share code, notes, and snippets.

@CoffeeVampir3
Created April 23, 2025 04:32
Show Gist options
  • Save CoffeeVampir3/eff004c22e4f377d7ec14855b818a497 to your computer and use it in GitHub Desktop.
Save CoffeeVampir3/eff004c22e4f377d7ec14855b818a497 to your computer and use it in GitHub Desktop.
router thing
import { ServerMessage, ClientMessage, StringMessage } from "./types.ts";
export class ClientStream {
private socket: WebSocket;
private pendingControlMessage: "beginStreaming" | "endStreaming" | null = null;
constructor(socket: WebSocket) {
this.socket = socket;
this.setupListeners();
}
sendStreamData(wordChunk: string): boolean {
const message: StringMessage = { type: "string", payload: wordChunk };
return this.sendMessage(message);
}
sendStreamEnd(): boolean {
const message: { type: "streamEnd" } = { type: "streamEnd" };
return this.sendMessage(message);
}
checkAndClearControlMessage(): "beginStreaming" | "endStreaming" | null {
const message = this.pendingControlMessage;
this.pendingControlMessage = null;
return message;
}
private sendMessage(message: ServerMessage): boolean {
if (this.socket.readyState !== WebSocket.OPEN) {
return false;
}
try {
this.socket.send(JSON.stringify(message));
return true;
} catch (error) {
console.error("Failed to send WebSocket message:", error);
return false;
}
}
close(code?: number, reason?: string): void {
if (
this.socket.readyState === WebSocket.OPEN ||
this.socket.readyState === WebSocket.CONNECTING
) {
this.socket.close(code, reason);
}
}
private setupListeners(): void {
this.socket.addEventListener("message", this.handleMessage);
this.socket.addEventListener("error", this.handleError);
this.socket.addEventListener("close", this.handleClose);
this.socket.addEventListener("open", this.handleOpen);
}
private handleOpen = (): void => {
console.log("WebSocket client connected. Ready to stream data.");
};
private handleMessage = (event: MessageEvent): void => {
try {
if (typeof event.data !== "string") return;
const message = JSON.parse(event.data) as ClientMessage;
this.processIncomingMessage(message);
} catch (error) {
console.error(
"Failed to parse or process incoming WebSocket message:",
event.data,
error
);
}
};
private processIncomingMessage(message: ClientMessage): void {
switch (message.type) {
case "string": {
const stringMsg = message as StringMessage;
console.log(`[WS RECV] Client sent string: ${stringMsg.payload}`);
this.sendStreamData(`Server received: ${stringMsg.payload}`);
break;
}
case "beginStreaming": {
console.log(`[WS RECV] Client requested to begin streaming`);
this.pendingControlMessage = "beginStreaming";
break;
}
case "endStreaming": {
console.log(`[WS RECV] Client requested to end streaming`);
this.pendingControlMessage = "endStreaming";
break;
}
default:
console.warn(
`[WS RECV] Received unhandled message type from client: ${
(message as any)?.type
}`
);
break;
}
};
private handleError = (event: Event): void => {
const msg = event instanceof ErrorEvent
? event.message
: "Unknown WebSocket error";
console.error("WebSocket error:", msg);
};
private handleClose = (event: CloseEvent): void => {
console.log(
`WebSocket client disconnected. Code: ${event.code}, Reason: "${event.reason}"`
);
};
}
import {
InferenceJobArguments,
InferenceProcessor,
} from "../frontend/InferenceProcessor.ts";
import { ClientStream } from "../router/clientStream.ts";
import { delay } from "https://deno.land/[email protected]/async/delay.ts";
import { StreamingState } from "./types.ts";
interface StreamingJob {
id: string;
clientStream: ClientStream;
state: StreamingState;
jobArgs: InferenceJobArguments;
}
export class Router {
private processor: InferenceProcessor;
private tools: any[];
private jobs = new Map<string, StreamingJob>();
private isRunning = false;
constructor(processor: InferenceProcessor, tools: any[]) {
this.processor = processor;
this.tools = tools;
}
start(): void {
if (this.isRunning) return;
this.isRunning = true;
this.monitorJobs();
}
stop(): void {
this.isRunning = false;
}
addJob(jobArgs: InferenceJobArguments, socket: WebSocket): string {
const jobId = crypto.randomUUID();
const clientStream = new ClientStream(socket);
this.jobs.set(jobId, {
id: jobId,
clientStream,
state: StreamingState.LISTENING,
jobArgs
});
console.log("Job added:", jobId);
return jobId;
}
private async monitorJobs(): Promise<void> {
while (this.isRunning) {
const jobPromises = Array.from(this.jobs.entries()).map(
([jobId, job]) => this.checkJobState(jobId, job)
);
await Promise.all(jobPromises);
await delay(10);
}
}
private checkJobState(jobId: string, job: StreamingJob) {
const controlMessage = job.clientStream.checkAndClearControlMessage();
if (controlMessage === "beginStreaming" && job.state === StreamingState.LISTENING) {
job.state = StreamingState.STREAMING;
this.processJob(jobId).catch(err => {
console.error(`Error processing job ${jobId}:`, err);
const erroredJob = this.jobs.get(jobId);
if (erroredJob) {
erroredJob.state = StreamingState.LISTENING;
}
});
}
else if (controlMessage === "endStreaming" && job.state === StreamingState.STREAMING) {
job.state = StreamingState.LISTENING;
}
}
private async processJob(jobId: string): Promise<void> {
const job = this.jobs.get(jobId);
if (!job) return;
try {
const inferenceJob = this.processor.newJob(job.jobArgs);
const currentJob = this.jobs.get(jobId);
for await (const chunk of this.processor.streamJob(inferenceJob, this.tools)) {
if (!currentJob || currentJob.state !== StreamingState.STREAMING) {
break;
}
if (chunk.type === "chunk" && chunk.text) {
currentJob.clientStream.sendStreamData(chunk.text);
if (chunk.tool && chunk.toolExecutor) {
await chunk.toolExecutor();
}
}
}
const endJob = this.jobs.get(jobId);
if (endJob && endJob.state === StreamingState.STREAMING) {
endJob.clientStream.sendStreamEnd();
endJob.state = StreamingState.LISTENING;
}
} catch (error) {
console.error(`Error in job ${jobId}:`, error);
const errorJob = this.jobs.get(jobId);
if (errorJob) {
errorJob.state = StreamingState.LISTENING;
}
}
}
}
export enum StreamingState {
LISTENING = 'LISTENING',
STREAMING = 'STREAMING'
}
export type MessageType = 'string' | 'streamEnd' | 'beginStreaming' | 'endStreaming';
export interface BaseMessage {
type: MessageType;
}
export interface StringMessage extends BaseMessage {
type: 'string';
payload: string;
}
export interface StreamEndMessage extends BaseMessage {
type: 'streamEnd';
}
export interface BeginStreamingMessage extends BaseMessage {
type: 'beginStreaming';
}
export interface EndStreamingMessage extends BaseMessage {
type: 'endStreaming';
}
export type ServerMessage = StringMessage | StreamEndMessage;
export type ClientMessage = StringMessage | BeginStreamingMessage | EndStreamingMessage;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment