Last active
June 28, 2025 21:13
-
-
Save celsowm/e6a72f636fc025067460621bd05084b4 to your computer and use it in GitHub Desktop.
LLMJsonStreamParser.js
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* @typedef {'object' | 'array'} ContainerType | |
*/ | |
/** | |
* @typedef {Object} ParserEvent | |
* @property {'objectStart' | 'objectEnd' | 'arrayStart' | 'arrayEnd' | 'key' | 'valueChunk' | 'value' | 'done' | 'error'} type | |
* @property {(string|number)[]} [path] | |
* @property {*} [value] | |
* @property {string} [chunk] | |
* @property {Error} [error] | |
*/ | |
// Parser States | |
const STATE = { | |
AWAITING_VALUE: 0, | |
IN_OBJECT_AWAITING_KEY: 1, | |
IN_OBJECT_AWAITING_COLON: 2, | |
IN_OBJECT_AWAITING_VALUE: 3, | |
IN_OBJECT_AWAITING_SEPARATOR: 4, | |
IN_ARRAY_AWAITING_VALUE: 5, | |
IN_ARRAY_AWAITING_SEPARATOR: 6, | |
IN_STRING: 7, | |
IN_NUMBER: 8, | |
IN_LITERAL: 9, // true, false, null | |
DONE: 10, | |
ERROR: 11, | |
}; | |
/** | |
* A specialized JSON stream parser for Server-Sent Events (SSE) from LLM APIs. | |
* It handles the SSE wrapping and the inner JSON stream in a single pass, | |
* emitting structured events for keys, values, and content chunks. | |
*/ | |
export class LLMJsonStreamParser extends EventTarget { | |
// --- Private fields --- | |
#jsonPointer; | |
#lineBuffer = ""; | |
#isDone = false; | |
// Core JSON Parser State | |
#state = STATE.AWAITING_VALUE; | |
#buffer = ""; | |
#pos = 0; | |
#isEscaped = false; | |
#valueBuffer = ""; | |
#stack = []; | |
// [FIX] Added a class field to accumulate string chunks across transform calls. | |
#stringChunks = []; | |
/** | |
* @param {object} [options] | |
* @param {string} [options.jsonPointer=/choices/0/delta/content] - A JSON Pointer to the string field | |
* within each SSE `data` payload that contains the streaming JSON content. | |
*/ | |
constructor({ jsonPointer = "/choices/0/delta/content" } = {}) { | |
super(); | |
this.#jsonPointer = jsonPointer.split("/").filter((p) => p); | |
} | |
/** | |
* Processes an incoming chunk from the SSE stream. | |
* @param {string} chunk - The chunk of text from the stream. | |
*/ | |
transform(chunk) { | |
if (this.#isDone || this.#state === STATE.ERROR) return; | |
this.#lineBuffer += chunk; | |
const lines = this.#lineBuffer.split(/\r?\n/); | |
this.#lineBuffer = lines.pop() || ""; | |
for (const line of lines) { | |
if (this.#isDone) break; | |
if (line.length === 0 || line.startsWith(":")) continue; | |
if (line.startsWith("data:")) { | |
const payload = line.slice(5).trim(); | |
if (payload === "[DONE]") { | |
this.flush(); | |
this.#isDone = true; | |
this.#emit({ type: "done" }); | |
continue; | |
} | |
try { | |
const sseJson = JSON.parse(payload); | |
const content = this.#getValueByPointer(sseJson); | |
if (typeof content === "string" && content.length > 0) { | |
this.#parseJsonChunk(content); | |
} | |
} catch (e) { | |
this.#emitError( | |
`Falha ao parsear o payload do SSE: ${e.message}. Payload: "${payload}"` | |
); | |
} | |
} | |
} | |
} | |
/** | |
* Finalizes the parsing process. Should be called after the stream ends. | |
*/ | |
flush() { | |
if ( | |
this.#state === STATE.AWAITING_VALUE && | |
this.#buffer.trim().length === 0 | |
) { | |
this.#state = STATE.DONE; | |
return; | |
} | |
if (this.#state !== STATE.DONE && this.#state !== STATE.AWAITING_VALUE) { | |
this.#emitError("JSON incompleto no final do fluxo"); | |
} | |
} | |
/** @private */ | |
#getValueByPointer(obj) { | |
let current = obj; | |
for (const key of this.#jsonPointer) { | |
if ( | |
current === null || | |
typeof current !== "object" || | |
!(key in current) | |
) { | |
return undefined; | |
} | |
current = current[key]; | |
} | |
return current; | |
} | |
// --- CORE JSON PARSER (ADAPTED) --- | |
/** @private */ | |
#emit(event) { | |
this.dispatchEvent(new CustomEvent(event.type, { detail: event })); | |
} | |
/** @private */ | |
#emitError(msg) { | |
if (this.#state === STATE.ERROR) return; | |
const path = this.#getCurrentPath().join(".") || "root"; | |
const error = new Error(`${msg} em path ${path}`); | |
this.#state = STATE.ERROR; | |
this.#buffer = ""; | |
this.#valueBuffer = ""; | |
// [FIX] Also clear the class field on error. | |
this.#stringChunks = []; | |
this.#emit({ type: "error", error }); | |
} | |
/** @private */ | |
#parseJsonChunk(jsonContent) { | |
this.#buffer += jsonContent; | |
this.#parse(); | |
} | |
/** @private */ | |
#parse() { | |
while (this.#pos < this.#buffer.length && this.#state < STATE.DONE) { | |
const char = this.#buffer[this.#pos]; | |
if (/\s/.test(char) && this.#state !== STATE.IN_STRING) { | |
this.#pos++; | |
continue; | |
} | |
this.#processChar(char); | |
} | |
if (this.#pos > 0) { | |
this.#buffer = this.#buffer.slice(this.#pos); | |
this.#pos = 0; | |
} | |
} | |
/** @private */ | |
#getCurrentPath() { | |
return this.#stack | |
.filter((f) => f.keyOrIndex !== undefined) | |
.map((f) => f.keyOrIndex); | |
} | |
/** @private */ | |
#processChar(char) { | |
switch (this.#state) { | |
case STATE.AWAITING_VALUE: | |
case STATE.IN_OBJECT_AWAITING_VALUE: | |
case STATE.IN_ARRAY_AWAITING_VALUE: | |
this.#handleAwaitValue(char); | |
break; | |
case STATE.IN_OBJECT_AWAITING_KEY: | |
if (char === '"') { | |
this.#pos++; | |
this.#state = STATE.IN_STRING; | |
} else if (char === "}") { | |
this.#closeContainer(); | |
} else { | |
this.#emitError( | |
`Esperava '"' para uma chave ou '}' mas recebeu '${char}'` | |
); | |
} | |
break; | |
case STATE.IN_OBJECT_AWAITING_COLON: | |
if (char === ":") { | |
this.#pos++; | |
this.#state = STATE.IN_OBJECT_AWAITING_VALUE; | |
} else { | |
this.#emitError(`Esperava ':' mas recebeu '${char}'`); | |
} | |
break; | |
case STATE.IN_OBJECT_AWAITING_SEPARATOR: | |
if (char === ",") { | |
this.#pos++; | |
this.#state = STATE.IN_OBJECT_AWAITING_KEY; | |
} else if (char === "}") { | |
this.#closeContainer(); | |
} else { | |
this.#emitError(`Esperava ',' ou '}' mas recebeu '${char}'`); | |
} | |
break; | |
case STATE.IN_ARRAY_AWAITING_SEPARATOR: | |
if (char === ",") { | |
this.#pos++; | |
this.#state = STATE.IN_ARRAY_AWAITING_VALUE; | |
} else if (char === "]") { | |
this.#closeContainer(); | |
} else { | |
this.#emitError(`Esperava ',' ou ']' mas recebeu '${char}'`); | |
} | |
break; | |
case STATE.IN_STRING: | |
this.#handleInString(); | |
break; | |
case STATE.IN_NUMBER: | |
this.#handleSimpleValue("number"); | |
break; | |
case STATE.IN_LITERAL: | |
this.#handleSimpleValue("literal"); | |
break; | |
default: | |
this.#emitError("Estado desconhecido do parser."); | |
} | |
} | |
/** @private */ | |
#handleAwaitValue(char) { | |
if (char === '"') { | |
this.#pos++; | |
this.#state = STATE.IN_STRING; | |
} else if (char === "{") { | |
this.#openContainer("object"); | |
} else if (char === "[") { | |
this.#openContainer("array"); | |
} else if (/[0-9\-]/.test(char)) { | |
this.#state = STATE.IN_NUMBER; | |
} else if (/[tfn]/.test(char)) { | |
this.#state = STATE.IN_LITERAL; | |
} else if (char === "]" && this.#stack.at(-1)?.type === "array") { | |
this.#closeContainer(); | |
} else { | |
this.#emitError(`Caractere inesperado '${char}' ao aguardar um valor`); | |
} | |
} | |
/** @private */ | |
#openContainer(type) { | |
const parentFrame = this.#stack.at(-1); | |
const parentState = parentFrame | |
? parentFrame.type === "object" | |
? STATE.IN_OBJECT_AWAITING_SEPARATOR | |
: STATE.IN_ARRAY_AWAITING_SEPARATOR | |
: STATE.DONE; | |
let path = this.#getCurrentPath(); | |
if (parentFrame?.type === "array") { | |
const newIndex = (parentFrame.keyOrIndex ?? -1) + 1; | |
parentFrame.keyOrIndex = newIndex; | |
path = [...path, newIndex]; | |
} | |
const frame = { type, keyOrIndex: undefined, parentState }; | |
this.#stack.push(frame); | |
this.#emit({ | |
type: type === "object" ? "objectStart" : "arrayStart", | |
path, | |
}); | |
this.#state = | |
type === "object" | |
? STATE.IN_OBJECT_AWAITING_KEY | |
: STATE.IN_ARRAY_AWAITING_VALUE; | |
this.#pos++; | |
} | |
/** @private */ | |
#closeContainer() { | |
const frame = this.#stack.at(-1); | |
if (!frame) | |
return this.#emitError( | |
`Tentativa de fechar um container que não foi aberto.` | |
); | |
const pathBeforePop = this.#getCurrentPath(); | |
this.#stack.pop(); | |
this.#emit({ | |
type: frame.type === "object" ? "objectEnd" : "arrayEnd", | |
path: pathBeforePop, | |
}); | |
this.#state = frame.parentState; | |
const parentFrame = this.#stack.at(-1); | |
if (parentFrame?.type === "object") { | |
parentFrame.keyOrIndex = undefined; | |
} | |
this.#pos++; | |
} | |
/** | |
* @private | |
* [FIX] This method is now corrected to use the class field `this.#stringChunks` | |
* instead of a local variable, allowing it to work across multiple `transform` calls. | |
*/ | |
#handleInString() { | |
const start = this.#pos; | |
while (this.#pos < this.#buffer.length) { | |
const char = this.#buffer[this.#pos]; | |
if (this.#isEscaped) { | |
this.#isEscaped = false; | |
} else if (char === "\\") { | |
this.#isEscaped = true; | |
} else if (char === '"') { | |
const chunk = this.#buffer.substring(start, this.#pos); | |
if (chunk) { | |
this.#emitValueChunk(chunk); | |
this.#stringChunks.push(chunk); // Use class field | |
} | |
this.#pos++; // Consume final quote | |
try { | |
const finalValue = JSON.parse(`"${this.#stringChunks.join("")}"`); | |
this.#finalizeValue(finalValue, "string"); | |
} catch (e) { | |
this.#emitError(`Sequência de string inválida: ${e.message}`); | |
} finally { | |
this.#stringChunks = []; // Reset for the next string | |
} | |
return; | |
} | |
this.#pos++; | |
} | |
// String did not end. Store partial chunk. | |
const chunk = this.#buffer.substring(start, this.#pos); | |
if (chunk) { | |
this.#emitValueChunk(chunk); | |
this.#stringChunks.push(chunk); // Save partial chunk to class field | |
} | |
} | |
/** @private */ | |
#handleSimpleValue(finalType) { | |
const regex = finalType === "number" ? /[0-9\.eE\+\-]/ : /[a-z]/; | |
const start = this.#pos; | |
while ( | |
this.#pos < this.#buffer.length && | |
regex.test(this.#buffer[this.#pos]) | |
) { | |
this.#pos++; | |
} | |
const chunk = this.#buffer.substring(start, this.#pos); | |
if (chunk) this.#valueBuffer += chunk; | |
if (this.#pos === this.#buffer.length) return; // Wait for more data | |
try { | |
const value = JSON.parse(this.#valueBuffer); | |
this.#finalizeValue(value, finalType); | |
} catch (e) { | |
this.#emitError( | |
`Falha ao parsear valor ${finalType}: '${this.#valueBuffer}'` | |
); | |
} finally { | |
this.#valueBuffer = ""; | |
} | |
} | |
/** @private */ | |
#emitValueChunk(chunk) { | |
const parentFrame = this.#stack.at(-1); | |
if ( | |
!parentFrame || | |
parentFrame.type === "array" || | |
parentFrame.keyOrIndex !== undefined | |
) { | |
let path; | |
if (parentFrame?.type === "array") { | |
const newIndex = (parentFrame.keyOrIndex ?? -1) + 1; | |
path = [...this.#getCurrentPath(), newIndex]; | |
} else { | |
path = this.#getCurrentPath(); | |
} | |
this.#emit({ type: "valueChunk", path, chunk }); | |
} | |
} | |
/** @private */ | |
#finalizeValue(value, valueType) { | |
const parent = this.#stack.at(-1); | |
if (!parent) { | |
this.#emit({ type: "value", path: [], value }); | |
this.#state = STATE.DONE; | |
return; | |
} | |
if (parent.type === "object") { | |
if (parent.keyOrIndex === undefined) { | |
parent.keyOrIndex = value; | |
this.#emit({ type: "key", path: this.#getCurrentPath(), value }); | |
this.#state = STATE.IN_OBJECT_AWAITING_COLON; | |
} else { | |
this.#emit({ type: "value", path: this.#getCurrentPath(), value }); | |
parent.keyOrIndex = undefined; | |
this.#state = STATE.IN_OBJECT_AWAITING_SEPARATOR; | |
} | |
} else if (parent.type === "array") { | |
parent.keyOrIndex = (parent.keyOrIndex ?? -1) + 1; | |
this.#emit({ type: "value", path: this.#getCurrentPath(), value }); | |
this.#state = STATE.IN_ARRAY_AWAITING_SEPARATOR; | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment