Created
April 29, 2025 03:45
-
-
Save CoffeeVampir3/dac504684ad966e6c3053644121d81dd to your computer and use it in GitHub Desktop.
wefjuejwuiofwjeuifchwie9ofh8i0923hf89023hf892h89fh298fh289f3h892hf.ts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { StreamingWebSocket, CancellationError } from "./streamingWebSocket.ts"; | |
/** | |
* Example of client-side streaming inference implementation | |
* Following the exact flow described in the specification | |
*/ | |
async function streamInference( | |
socket: StreamingWebSocket, | |
onContent: (content: string) => void | |
): Promise<void> { | |
try { | |
// 1. Send InferenceRequest | |
await socket.sendMessage({ | |
type: "Message", | |
name: "InferenceMessage" | |
}); | |
// 2. Await StreamEstablishRequest from server | |
const establishMsg = await socket.waitForMessage("StreamEstablishMessage"); | |
const streamId = establishMsg.id; | |
// Set the stream ID for filtering | |
socket.setStreamId(streamId); | |
// 3. Send StreamReady with the ID | |
await socket.sendMessage({ | |
type: "Message", | |
name: "StreamReadyMessage", | |
id: streamId | |
}); | |
// 4. Process content messages until we get a finish message | |
try { | |
// Create an async iterator for StreamContentMessage and StreamFinishMessage | |
const messageStream = { | |
[Symbol.asyncIterator]() { | |
return { | |
async next() { | |
// Check if socket is still connected | |
if (!socket.isOpen()) { | |
return { done: true, value: undefined }; | |
} | |
try { | |
// Wait for either content or finish message | |
const result = await socket.waitForMessageOrCancellation([ | |
"StreamContentMessage", | |
"StreamFinishMessage" | |
]); | |
// If it's a finish message, we're done | |
if (result.type === "StreamFinishMessage") { | |
return { done: true, value: result.message }; | |
} | |
// Otherwise return the content message | |
return { done: false, value: result.message }; | |
} catch (error) { | |
// If cancelled, end the iteration | |
if (error instanceof CancellationError) { | |
return { done: true, value: undefined }; | |
} | |
throw error; | |
} | |
} | |
}; | |
} | |
}; | |
// Use for await loop to process messages | |
let finishMessage = null; | |
for await (const message of messageStream) { | |
// If it's a content message, call the callback with the content | |
if(message && message.name === "StreamContentMessage") { | |
onContent(message.content); | |
} | |
} | |
// 5. Send acknowledgment | |
await socket.sendMessage({ | |
type: "Message", | |
name: "StreamFinishMessage", | |
id: streamId | |
}); | |
} finally { | |
socket.clearStreamId(); | |
} | |
} catch (error) { | |
// Handle errors | |
socket.clearStreamId(); | |
throw error; | |
} | |
} | |
/** | |
* Cancels an active stream | |
*/ | |
async function cancelStream(socket: StreamingWebSocket): Promise<void> { | |
if (socket.activeStreamId) { | |
await socket.sendMessage({ | |
type: "Message", | |
name: "StreamCancelMessage", | |
id: socket.activeStreamId | |
}); | |
} | |
} | |
/** | |
* Example usage: | |
* | |
* // Set up WebSocket connection | |
* const websocket = new WebSocket("ws://example.com/stream"); | |
* const socket = new StreamingWebSocket(websocket); | |
* | |
* // Wait for connection | |
* await socket.waitForOpen(); | |
* | |
* // Function to handle content | |
* function handleContent(content: string) { | |
* console.log(`Received: ${content}`); | |
* } | |
* | |
* try { | |
* // Start streaming | |
* await streamInference(socket, handleContent); | |
* console.log("Stream completed"); | |
* } catch (error) { | |
* if (error instanceof CancellationError) { | |
* console.log("Stream was cancelled"); | |
* } else { | |
* console.error("Error:", error); | |
* } | |
* } | |
* | |
* // To cancel: | |
* // await cancelStream(socket); | |
*/ | |
export { streamInference, cancelStream }; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment