Created
April 23, 2025 04:32
-
-
Save CoffeeVampir3/eff004c22e4f377d7ec14855b818a497 to your computer and use it in GitHub Desktop.
router thing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { ServerMessage, ClientMessage, StringMessage } from "./types.ts"; | |
export class ClientStream { | |
private socket: WebSocket; | |
private pendingControlMessage: "beginStreaming" | "endStreaming" | null = null; | |
constructor(socket: WebSocket) { | |
this.socket = socket; | |
this.setupListeners(); | |
} | |
sendStreamData(wordChunk: string): boolean { | |
const message: StringMessage = { type: "string", payload: wordChunk }; | |
return this.sendMessage(message); | |
} | |
sendStreamEnd(): boolean { | |
const message: { type: "streamEnd" } = { type: "streamEnd" }; | |
return this.sendMessage(message); | |
} | |
checkAndClearControlMessage(): "beginStreaming" | "endStreaming" | null { | |
const message = this.pendingControlMessage; | |
this.pendingControlMessage = null; | |
return message; | |
} | |
private sendMessage(message: ServerMessage): boolean { | |
if (this.socket.readyState !== WebSocket.OPEN) { | |
return false; | |
} | |
try { | |
this.socket.send(JSON.stringify(message)); | |
return true; | |
} catch (error) { | |
console.error("Failed to send WebSocket message:", error); | |
return false; | |
} | |
} | |
close(code?: number, reason?: string): void { | |
if ( | |
this.socket.readyState === WebSocket.OPEN || | |
this.socket.readyState === WebSocket.CONNECTING | |
) { | |
this.socket.close(code, reason); | |
} | |
} | |
private setupListeners(): void { | |
this.socket.addEventListener("message", this.handleMessage); | |
this.socket.addEventListener("error", this.handleError); | |
this.socket.addEventListener("close", this.handleClose); | |
this.socket.addEventListener("open", this.handleOpen); | |
} | |
private handleOpen = (): void => { | |
console.log("WebSocket client connected. Ready to stream data."); | |
}; | |
private handleMessage = (event: MessageEvent): void => { | |
try { | |
if (typeof event.data !== "string") return; | |
const message = JSON.parse(event.data) as ClientMessage; | |
this.processIncomingMessage(message); | |
} catch (error) { | |
console.error( | |
"Failed to parse or process incoming WebSocket message:", | |
event.data, | |
error | |
); | |
} | |
}; | |
private processIncomingMessage(message: ClientMessage): void { | |
switch (message.type) { | |
case "string": { | |
const stringMsg = message as StringMessage; | |
console.log(`[WS RECV] Client sent string: ${stringMsg.payload}`); | |
this.sendStreamData(`Server received: ${stringMsg.payload}`); | |
break; | |
} | |
case "beginStreaming": { | |
console.log(`[WS RECV] Client requested to begin streaming`); | |
this.pendingControlMessage = "beginStreaming"; | |
break; | |
} | |
case "endStreaming": { | |
console.log(`[WS RECV] Client requested to end streaming`); | |
this.pendingControlMessage = "endStreaming"; | |
break; | |
} | |
default: | |
console.warn( | |
`[WS RECV] Received unhandled message type from client: ${ | |
(message as any)?.type | |
}` | |
); | |
break; | |
} | |
}; | |
private handleError = (event: Event): void => { | |
const msg = event instanceof ErrorEvent | |
? event.message | |
: "Unknown WebSocket error"; | |
console.error("WebSocket error:", msg); | |
}; | |
private handleClose = (event: CloseEvent): void => { | |
console.log( | |
`WebSocket client disconnected. Code: ${event.code}, Reason: "${event.reason}"` | |
); | |
}; | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { | |
InferenceJobArguments, | |
InferenceProcessor, | |
} from "../frontend/InferenceProcessor.ts"; | |
import { ClientStream } from "../router/clientStream.ts"; | |
import { delay } from "https://deno.land/[email protected]/async/delay.ts"; | |
import { StreamingState } from "./types.ts"; | |
interface StreamingJob { | |
id: string; | |
clientStream: ClientStream; | |
state: StreamingState; | |
jobArgs: InferenceJobArguments; | |
} | |
export class Router { | |
private processor: InferenceProcessor; | |
private tools: any[]; | |
private jobs = new Map<string, StreamingJob>(); | |
private isRunning = false; | |
constructor(processor: InferenceProcessor, tools: any[]) { | |
this.processor = processor; | |
this.tools = tools; | |
} | |
start(): void { | |
if (this.isRunning) return; | |
this.isRunning = true; | |
this.monitorJobs(); | |
} | |
stop(): void { | |
this.isRunning = false; | |
} | |
addJob(jobArgs: InferenceJobArguments, socket: WebSocket): string { | |
const jobId = crypto.randomUUID(); | |
const clientStream = new ClientStream(socket); | |
this.jobs.set(jobId, { | |
id: jobId, | |
clientStream, | |
state: StreamingState.LISTENING, | |
jobArgs | |
}); | |
console.log("Job added:", jobId); | |
return jobId; | |
} | |
private async monitorJobs(): Promise<void> { | |
while (this.isRunning) { | |
const jobPromises = Array.from(this.jobs.entries()).map( | |
([jobId, job]) => this.checkJobState(jobId, job) | |
); | |
await Promise.all(jobPromises); | |
await delay(10); | |
} | |
} | |
private checkJobState(jobId: string, job: StreamingJob) { | |
const controlMessage = job.clientStream.checkAndClearControlMessage(); | |
if (controlMessage === "beginStreaming" && job.state === StreamingState.LISTENING) { | |
job.state = StreamingState.STREAMING; | |
this.processJob(jobId).catch(err => { | |
console.error(`Error processing job ${jobId}:`, err); | |
const erroredJob = this.jobs.get(jobId); | |
if (erroredJob) { | |
erroredJob.state = StreamingState.LISTENING; | |
} | |
}); | |
} | |
else if (controlMessage === "endStreaming" && job.state === StreamingState.STREAMING) { | |
job.state = StreamingState.LISTENING; | |
} | |
} | |
private async processJob(jobId: string): Promise<void> { | |
const job = this.jobs.get(jobId); | |
if (!job) return; | |
try { | |
const inferenceJob = this.processor.newJob(job.jobArgs); | |
const currentJob = this.jobs.get(jobId); | |
for await (const chunk of this.processor.streamJob(inferenceJob, this.tools)) { | |
if (!currentJob || currentJob.state !== StreamingState.STREAMING) { | |
break; | |
} | |
if (chunk.type === "chunk" && chunk.text) { | |
currentJob.clientStream.sendStreamData(chunk.text); | |
if (chunk.tool && chunk.toolExecutor) { | |
await chunk.toolExecutor(); | |
} | |
} | |
} | |
const endJob = this.jobs.get(jobId); | |
if (endJob && endJob.state === StreamingState.STREAMING) { | |
endJob.clientStream.sendStreamEnd(); | |
endJob.state = StreamingState.LISTENING; | |
} | |
} catch (error) { | |
console.error(`Error in job ${jobId}:`, error); | |
const errorJob = this.jobs.get(jobId); | |
if (errorJob) { | |
errorJob.state = StreamingState.LISTENING; | |
} | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
export enum StreamingState { | |
LISTENING = 'LISTENING', | |
STREAMING = 'STREAMING' | |
} | |
export type MessageType = 'string' | 'streamEnd' | 'beginStreaming' | 'endStreaming'; | |
export interface BaseMessage { | |
type: MessageType; | |
} | |
export interface StringMessage extends BaseMessage { | |
type: 'string'; | |
payload: string; | |
} | |
export interface StreamEndMessage extends BaseMessage { | |
type: 'streamEnd'; | |
} | |
export interface BeginStreamingMessage extends BaseMessage { | |
type: 'beginStreaming'; | |
} | |
export interface EndStreamingMessage extends BaseMessage { | |
type: 'endStreaming'; | |
} | |
export type ServerMessage = StringMessage | StreamEndMessage; | |
export type ClientMessage = StringMessage | BeginStreamingMessage | EndStreamingMessage; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment