Last active
July 2, 2025 10:18
-
-
Save dmitry-stepanenko/df94387da74abfb1e892894a7954406e to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import { Chat } from '@hashbrownai/core'; | |
import { ChatCompletionStream } from 'openai/lib/ChatCompletionStream'; | |
type UIObject = { | |
ui: Array<Record<string, unknown>>; | |
}; | |
export async function* processChunksAsync( | |
stream: ChatCompletionStream<null>, | |
): AsyncGenerator<Chat.Api.CompletionChunk, void, unknown> { | |
let isCapturing = false; | |
let buffer: Chat.Api.CompletionChunk[] = []; | |
for await (const chunk of stream) { | |
const chunkMessage: Chat.Api.CompletionChunk = { | |
choices: chunk.choices.map( | |
(choice): Chat.Api.CompletionChunkChoice => ({ | |
index: choice.index, | |
delta: { | |
content: choice.delta?.content, | |
role: choice.delta?.role, | |
toolCalls: choice.delta?.tool_calls, | |
}, | |
finishReason: choice.finish_reason, | |
}), | |
), | |
}; | |
const choices = chunkMessage.choices || []; | |
if (choices.length === 0) { | |
if (buffer.length) { | |
yield mergeBufferIntoFirst(buffer); | |
buffer = []; | |
isCapturing = false; | |
} | |
yield chunkMessage; | |
continue; | |
} | |
const choice = choices[0]; | |
if ( | |
choice.delta?.role === 'assistant' && | |
choice.delta.content === '' && | |
choice.finishReason === null | |
) { | |
buffer = [chunkMessage]; | |
isCapturing = true; | |
continue; | |
} | |
if (isCapturing) { | |
const deltaKeys = Object.entries(choice.delta ?? {}).filter( | |
([, v]) => v != null, | |
); | |
const hasOtherPropsExceptContent = | |
deltaKeys.length !== 1 || deltaKeys[0][0] !== 'content'; | |
if (choice.finishReason !== null || hasOtherPropsExceptContent) { | |
yield mergeBufferIntoFirst(buffer); | |
buffer = []; | |
isCapturing = false; | |
yield chunkMessage; | |
} else { | |
buffer.push(chunkMessage); | |
} | |
continue; | |
} | |
yield chunkMessage; | |
} | |
if (buffer.length) { | |
yield mergeBufferIntoFirst(buffer); | |
} | |
} | |
function mergeBufferIntoFirst(buffer: Chat.Api.CompletionChunk[]) { | |
const aggregatedChunk = buffer.reduce((acc, c) => { | |
acc.choices[0].delta.content! += c.choices[0].delta.content; | |
return acc; | |
}); | |
const aggregatedContent = aggregatedChunk.choices[0].delta.content!; | |
const repaired = tryParseAndMergeJsonString(aggregatedContent); | |
aggregatedChunk.choices[0].delta.content = JSON.stringify(repaired); | |
return aggregatedChunk; | |
} | |
function isValidUIObject(obj: unknown): obj is UIObject { | |
return ( | |
typeof obj === 'object' && | |
obj !== null && | |
Array.isArray((obj as UIObject).ui) && | |
(obj as UIObject).ui.every( | |
(item) => typeof item === 'object' && item !== null, | |
) | |
); | |
} | |
function tryParseAndMergeJsonString(input: string): UIObject | null { | |
try { | |
const parsed = JSON.parse(input); | |
return parsed; | |
} catch (error: any) { | |
const message: string | undefined = error?.message; | |
const match = message?.match(/at position (\d+)/); | |
if (match) { | |
const index = Number(match[1]); | |
const before = input.slice(0, index).trim(); | |
const after = input.slice(index).trim(); | |
let left: UIObject | null = null; | |
let right: UIObject | null = null; | |
try { | |
const parsedLeft = JSON.parse(before); | |
if (isValidUIObject(parsedLeft)) { | |
left = parsedLeft; | |
} | |
} catch { | |
// ignore the error | |
} | |
try { | |
const parsedRight = JSON.parse(after); | |
if (isValidUIObject(parsedRight)) { | |
right = parsedRight; | |
} | |
} catch { | |
// ignore the error | |
} | |
if (left && right) { | |
return { | |
ui: [...left.ui, ...right.ui], | |
}; | |
} else if (left) { | |
return left; | |
} else if (right) { | |
return right; | |
} | |
} | |
throw error; | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/packages/azure/src/stream/text.fn.ts b/packages/azure/src/stream/text.fn.ts | |
index f7f3f37..804df7e 100644 | |
--- a/packages/azure/src/stream/text.fn.ts | |
+++ b/packages/azure/src/stream/text.fn.ts | |
@@ -1,6 +1,7 @@ | |
import { Chat, encodeFrame, Frame } from '@hashbrownai/core'; | |
import OpenAI, { AzureOpenAI } from 'openai'; | |
import type { FunctionParameters } from 'openai/resources/shared'; | |
+import { processChunksAsync } from './repair'; | |
export interface AzureTextStreamOptions { | |
apiKey: string; | |
@@ -118,21 +119,7 @@ export async function* text( | |
const stream = client.beta.chat.completions.stream(resolvedOptions); | |
- for await (const chunk of stream) { | |
- const chunkMessage: Chat.Api.CompletionChunk = { | |
- choices: chunk.choices.map( | |
- (choice): Chat.Api.CompletionChunkChoice => ({ | |
- index: choice.index, | |
- delta: { | |
- content: choice.delta?.content, | |
- role: choice.delta?.role, | |
- toolCalls: choice.delta?.tool_calls, | |
- }, | |
- finishReason: choice.finish_reason, | |
- }), | |
- ), | |
- }; | |
- | |
+ for await (const chunkMessage of processChunksAsync(stream)) { | |
const frame: Frame = { | |
type: 'chunk', | |
chunk: chunkMessage, |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment