Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save CoffeeVampir3/dac504684ad966e6c3053644121d81dd to your computer and use it in GitHub Desktop.
Save CoffeeVampir3/dac504684ad966e6c3053644121d81dd to your computer and use it in GitHub Desktop.
wefjuejwuiofwjeuifchwie9ofh8i0923hf89023hf892h89fh298fh289f3h892hf.ts
import { StreamingWebSocket, CancellationError } from "./streamingWebSocket.ts";
/**
* Example of client-side streaming inference implementation
* Following the exact flow described in the specification
*/
async function streamInference(
socket: StreamingWebSocket,
onContent: (content: string) => void
): Promise<void> {
try {
// 1. Send InferenceRequest
await socket.sendMessage({
type: "Message",
name: "InferenceMessage"
});
// 2. Await StreamEstablishRequest from server
const establishMsg = await socket.waitForMessage("StreamEstablishMessage");
const streamId = establishMsg.id;
// Set the stream ID for filtering
socket.setStreamId(streamId);
// 3. Send StreamReady with the ID
await socket.sendMessage({
type: "Message",
name: "StreamReadyMessage",
id: streamId
});
// 4. Process content messages until we get a finish message
try {
// Create an async iterator for StreamContentMessage and StreamFinishMessage
const messageStream = {
[Symbol.asyncIterator]() {
return {
async next() {
// Check if socket is still connected
if (!socket.isOpen()) {
return { done: true, value: undefined };
}
try {
// Wait for either content or finish message
const result = await socket.waitForMessageOrCancellation([
"StreamContentMessage",
"StreamFinishMessage"
]);
// If it's a finish message, we're done
if (result.type === "StreamFinishMessage") {
return { done: true, value: result.message };
}
// Otherwise return the content message
return { done: false, value: result.message };
} catch (error) {
// If cancelled, end the iteration
if (error instanceof CancellationError) {
return { done: true, value: undefined };
}
throw error;
}
}
};
}
};
// Use for await loop to process messages
let finishMessage = null;
for await (const message of messageStream) {
// If it's a content message, call the callback with the content
if(message && message.name === "StreamContentMessage") {
onContent(message.content);
}
}
// 5. Send acknowledgment
await socket.sendMessage({
type: "Message",
name: "StreamFinishMessage",
id: streamId
});
} finally {
socket.clearStreamId();
}
} catch (error) {
// Handle errors
socket.clearStreamId();
throw error;
}
}
/**
* Cancels an active stream
*/
async function cancelStream(socket: StreamingWebSocket): Promise<void> {
if (socket.activeStreamId) {
await socket.sendMessage({
type: "Message",
name: "StreamCancelMessage",
id: socket.activeStreamId
});
}
}
/**
* Example usage:
*
* // Set up WebSocket connection
* const websocket = new WebSocket("ws://example.com/stream");
* const socket = new StreamingWebSocket(websocket);
*
* // Wait for connection
* await socket.waitForOpen();
*
* // Function to handle content
* function handleContent(content: string) {
* console.log(`Received: ${content}`);
* }
*
* try {
* // Start streaming
* await streamInference(socket, handleContent);
* console.log("Stream completed");
* } catch (error) {
* if (error instanceof CancellationError) {
* console.log("Stream was cancelled");
* } else {
* console.error("Error:", error);
* }
* }
*
* // To cancel:
* // await cancelStream(socket);
*/
export { streamInference, cancelStream };
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment