Skip to content

Instantly share code, notes, and snippets.

@dmitry-stepanenko
Last active July 2, 2025 10:18
Show Gist options
  • Save dmitry-stepanenko/df94387da74abfb1e892894a7954406e to your computer and use it in GitHub Desktop.
Save dmitry-stepanenko/df94387da74abfb1e892894a7954406e to your computer and use it in GitHub Desktop.
import { Chat } from '@hashbrownai/core';
import { ChatCompletionStream } from 'openai/lib/ChatCompletionStream';
type UIObject = {
ui: Array<Record<string, unknown>>;
};
export async function* processChunksAsync(
stream: ChatCompletionStream<null>,
): AsyncGenerator<Chat.Api.CompletionChunk, void, unknown> {
let isCapturing = false;
let buffer: Chat.Api.CompletionChunk[] = [];
for await (const chunk of stream) {
const chunkMessage: Chat.Api.CompletionChunk = {
choices: chunk.choices.map(
(choice): Chat.Api.CompletionChunkChoice => ({
index: choice.index,
delta: {
content: choice.delta?.content,
role: choice.delta?.role,
toolCalls: choice.delta?.tool_calls,
},
finishReason: choice.finish_reason,
}),
),
};
const choices = chunkMessage.choices || [];
if (choices.length === 0) {
if (buffer.length) {
yield mergeBufferIntoFirst(buffer);
buffer = [];
isCapturing = false;
}
yield chunkMessage;
continue;
}
const choice = choices[0];
if (
choice.delta?.role === 'assistant' &&
choice.delta.content === '' &&
choice.finishReason === null
) {
buffer = [chunkMessage];
isCapturing = true;
continue;
}
if (isCapturing) {
const deltaKeys = Object.entries(choice.delta ?? {}).filter(
([, v]) => v != null,
);
const hasOtherPropsExceptContent =
deltaKeys.length !== 1 || deltaKeys[0][0] !== 'content';
if (choice.finishReason !== null || hasOtherPropsExceptContent) {
yield mergeBufferIntoFirst(buffer);
buffer = [];
isCapturing = false;
yield chunkMessage;
} else {
buffer.push(chunkMessage);
}
continue;
}
yield chunkMessage;
}
if (buffer.length) {
yield mergeBufferIntoFirst(buffer);
}
}
function mergeBufferIntoFirst(buffer: Chat.Api.CompletionChunk[]) {
const aggregatedChunk = buffer.reduce((acc, c) => {
acc.choices[0].delta.content! += c.choices[0].delta.content;
return acc;
});
const aggregatedContent = aggregatedChunk.choices[0].delta.content!;
const repaired = tryParseAndMergeJsonString(aggregatedContent);
aggregatedChunk.choices[0].delta.content = JSON.stringify(repaired);
return aggregatedChunk;
}
function isValidUIObject(obj: unknown): obj is UIObject {
return (
typeof obj === 'object' &&
obj !== null &&
Array.isArray((obj as UIObject).ui) &&
(obj as UIObject).ui.every(
(item) => typeof item === 'object' && item !== null,
)
);
}
function tryParseAndMergeJsonString(input: string): UIObject | null {
try {
const parsed = JSON.parse(input);
return parsed;
} catch (error: any) {
const message: string | undefined = error?.message;
const match = message?.match(/at position (\d+)/);
if (match) {
const index = Number(match[1]);
const before = input.slice(0, index).trim();
const after = input.slice(index).trim();
let left: UIObject | null = null;
let right: UIObject | null = null;
try {
const parsedLeft = JSON.parse(before);
if (isValidUIObject(parsedLeft)) {
left = parsedLeft;
}
} catch {
// ignore the error
}
try {
const parsedRight = JSON.parse(after);
if (isValidUIObject(parsedRight)) {
right = parsedRight;
}
} catch {
// ignore the error
}
if (left && right) {
return {
ui: [...left.ui, ...right.ui],
};
} else if (left) {
return left;
} else if (right) {
return right;
}
}
throw error;
}
}
diff --git a/packages/azure/src/stream/text.fn.ts b/packages/azure/src/stream/text.fn.ts
index f7f3f37..804df7e 100644
--- a/packages/azure/src/stream/text.fn.ts
+++ b/packages/azure/src/stream/text.fn.ts
@@ -1,6 +1,7 @@
import { Chat, encodeFrame, Frame } from '@hashbrownai/core';
import OpenAI, { AzureOpenAI } from 'openai';
import type { FunctionParameters } from 'openai/resources/shared';
+import { processChunksAsync } from './repair';
export interface AzureTextStreamOptions {
apiKey: string;
@@ -118,21 +119,7 @@ export async function* text(
const stream = client.beta.chat.completions.stream(resolvedOptions);
- for await (const chunk of stream) {
- const chunkMessage: Chat.Api.CompletionChunk = {
- choices: chunk.choices.map(
- (choice): Chat.Api.CompletionChunkChoice => ({
- index: choice.index,
- delta: {
- content: choice.delta?.content,
- role: choice.delta?.role,
- toolCalls: choice.delta?.tool_calls,
- },
- finishReason: choice.finish_reason,
- }),
- ),
- };
-
+ for await (const chunkMessage of processChunksAsync(stream)) {
const frame: Frame = {
type: 'chunk',
chunk: chunkMessage,
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment