Skip to content

Instantly share code, notes, and snippets.

@celsowm
Last active June 28, 2025 21:13
Show Gist options
  • Save celsowm/e6a72f636fc025067460621bd05084b4 to your computer and use it in GitHub Desktop.
Save celsowm/e6a72f636fc025067460621bd05084b4 to your computer and use it in GitHub Desktop.
LLMJsonStreamParser.js
/**
* @typedef {'object' | 'array'} ContainerType
*/
/**
* @typedef {Object} ParserEvent
* @property {'objectStart' | 'objectEnd' | 'arrayStart' | 'arrayEnd' | 'key' | 'valueChunk' | 'value' | 'done' | 'error'} type
* @property {(string|number)[]} [path]
* @property {*} [value]
* @property {string} [chunk]
* @property {Error} [error]
*/
// Parser States
const STATE = {
AWAITING_VALUE: 0,
IN_OBJECT_AWAITING_KEY: 1,
IN_OBJECT_AWAITING_COLON: 2,
IN_OBJECT_AWAITING_VALUE: 3,
IN_OBJECT_AWAITING_SEPARATOR: 4,
IN_ARRAY_AWAITING_VALUE: 5,
IN_ARRAY_AWAITING_SEPARATOR: 6,
IN_STRING: 7,
IN_NUMBER: 8,
IN_LITERAL: 9, // true, false, null
DONE: 10,
ERROR: 11,
};
/**
* A specialized JSON stream parser for Server-Sent Events (SSE) from LLM APIs.
* It handles the SSE wrapping and the inner JSON stream in a single pass,
* emitting structured events for keys, values, and content chunks.
*/
export class LLMJsonStreamParser extends EventTarget {
// --- Private fields ---
#jsonPointer;
#lineBuffer = "";
#isDone = false;
// Core JSON Parser State
#state = STATE.AWAITING_VALUE;
#buffer = "";
#pos = 0;
#isEscaped = false;
#valueBuffer = "";
#stack = [];
// [FIX] Added a class field to accumulate string chunks across transform calls.
#stringChunks = [];
/**
* @param {object} [options]
* @param {string} [options.jsonPointer=/choices/0/delta/content] - A JSON Pointer to the string field
* within each SSE `data` payload that contains the streaming JSON content.
*/
constructor({ jsonPointer = "/choices/0/delta/content" } = {}) {
super();
this.#jsonPointer = jsonPointer.split("/").filter((p) => p);
}
/**
* Processes an incoming chunk from the SSE stream.
* @param {string} chunk - The chunk of text from the stream.
*/
transform(chunk) {
if (this.#isDone || this.#state === STATE.ERROR) return;
this.#lineBuffer += chunk;
const lines = this.#lineBuffer.split(/\r?\n/);
this.#lineBuffer = lines.pop() || "";
for (const line of lines) {
if (this.#isDone) break;
if (line.length === 0 || line.startsWith(":")) continue;
if (line.startsWith("data:")) {
const payload = line.slice(5).trim();
if (payload === "[DONE]") {
this.flush();
this.#isDone = true;
this.#emit({ type: "done" });
continue;
}
try {
const sseJson = JSON.parse(payload);
const content = this.#getValueByPointer(sseJson);
if (typeof content === "string" && content.length > 0) {
this.#parseJsonChunk(content);
}
} catch (e) {
this.#emitError(
`Falha ao parsear o payload do SSE: ${e.message}. Payload: "${payload}"`
);
}
}
}
}
/**
* Finalizes the parsing process. Should be called after the stream ends.
*/
flush() {
if (
this.#state === STATE.AWAITING_VALUE &&
this.#buffer.trim().length === 0
) {
this.#state = STATE.DONE;
return;
}
if (this.#state !== STATE.DONE && this.#state !== STATE.AWAITING_VALUE) {
this.#emitError("JSON incompleto no final do fluxo");
}
}
/** @private */
#getValueByPointer(obj) {
let current = obj;
for (const key of this.#jsonPointer) {
if (
current === null ||
typeof current !== "object" ||
!(key in current)
) {
return undefined;
}
current = current[key];
}
return current;
}
// --- CORE JSON PARSER (ADAPTED) ---
/** @private */
#emit(event) {
this.dispatchEvent(new CustomEvent(event.type, { detail: event }));
}
/** @private */
#emitError(msg) {
if (this.#state === STATE.ERROR) return;
const path = this.#getCurrentPath().join(".") || "root";
const error = new Error(`${msg} em path ${path}`);
this.#state = STATE.ERROR;
this.#buffer = "";
this.#valueBuffer = "";
// [FIX] Also clear the class field on error.
this.#stringChunks = [];
this.#emit({ type: "error", error });
}
/** @private */
#parseJsonChunk(jsonContent) {
this.#buffer += jsonContent;
this.#parse();
}
/** @private */
#parse() {
while (this.#pos < this.#buffer.length && this.#state < STATE.DONE) {
const char = this.#buffer[this.#pos];
if (/\s/.test(char) && this.#state !== STATE.IN_STRING) {
this.#pos++;
continue;
}
this.#processChar(char);
}
if (this.#pos > 0) {
this.#buffer = this.#buffer.slice(this.#pos);
this.#pos = 0;
}
}
/** @private */
#getCurrentPath() {
return this.#stack
.filter((f) => f.keyOrIndex !== undefined)
.map((f) => f.keyOrIndex);
}
/** @private */
#processChar(char) {
switch (this.#state) {
case STATE.AWAITING_VALUE:
case STATE.IN_OBJECT_AWAITING_VALUE:
case STATE.IN_ARRAY_AWAITING_VALUE:
this.#handleAwaitValue(char);
break;
case STATE.IN_OBJECT_AWAITING_KEY:
if (char === '"') {
this.#pos++;
this.#state = STATE.IN_STRING;
} else if (char === "}") {
this.#closeContainer();
} else {
this.#emitError(
`Esperava '"' para uma chave ou '}' mas recebeu '${char}'`
);
}
break;
case STATE.IN_OBJECT_AWAITING_COLON:
if (char === ":") {
this.#pos++;
this.#state = STATE.IN_OBJECT_AWAITING_VALUE;
} else {
this.#emitError(`Esperava ':' mas recebeu '${char}'`);
}
break;
case STATE.IN_OBJECT_AWAITING_SEPARATOR:
if (char === ",") {
this.#pos++;
this.#state = STATE.IN_OBJECT_AWAITING_KEY;
} else if (char === "}") {
this.#closeContainer();
} else {
this.#emitError(`Esperava ',' ou '}' mas recebeu '${char}'`);
}
break;
case STATE.IN_ARRAY_AWAITING_SEPARATOR:
if (char === ",") {
this.#pos++;
this.#state = STATE.IN_ARRAY_AWAITING_VALUE;
} else if (char === "]") {
this.#closeContainer();
} else {
this.#emitError(`Esperava ',' ou ']' mas recebeu '${char}'`);
}
break;
case STATE.IN_STRING:
this.#handleInString();
break;
case STATE.IN_NUMBER:
this.#handleSimpleValue("number");
break;
case STATE.IN_LITERAL:
this.#handleSimpleValue("literal");
break;
default:
this.#emitError("Estado desconhecido do parser.");
}
}
/** @private */
#handleAwaitValue(char) {
if (char === '"') {
this.#pos++;
this.#state = STATE.IN_STRING;
} else if (char === "{") {
this.#openContainer("object");
} else if (char === "[") {
this.#openContainer("array");
} else if (/[0-9\-]/.test(char)) {
this.#state = STATE.IN_NUMBER;
} else if (/[tfn]/.test(char)) {
this.#state = STATE.IN_LITERAL;
} else if (char === "]" && this.#stack.at(-1)?.type === "array") {
this.#closeContainer();
} else {
this.#emitError(`Caractere inesperado '${char}' ao aguardar um valor`);
}
}
/** @private */
#openContainer(type) {
const parentFrame = this.#stack.at(-1);
const parentState = parentFrame
? parentFrame.type === "object"
? STATE.IN_OBJECT_AWAITING_SEPARATOR
: STATE.IN_ARRAY_AWAITING_SEPARATOR
: STATE.DONE;
let path = this.#getCurrentPath();
if (parentFrame?.type === "array") {
const newIndex = (parentFrame.keyOrIndex ?? -1) + 1;
parentFrame.keyOrIndex = newIndex;
path = [...path, newIndex];
}
const frame = { type, keyOrIndex: undefined, parentState };
this.#stack.push(frame);
this.#emit({
type: type === "object" ? "objectStart" : "arrayStart",
path,
});
this.#state =
type === "object"
? STATE.IN_OBJECT_AWAITING_KEY
: STATE.IN_ARRAY_AWAITING_VALUE;
this.#pos++;
}
/** @private */
#closeContainer() {
const frame = this.#stack.at(-1);
if (!frame)
return this.#emitError(
`Tentativa de fechar um container que não foi aberto.`
);
const pathBeforePop = this.#getCurrentPath();
this.#stack.pop();
this.#emit({
type: frame.type === "object" ? "objectEnd" : "arrayEnd",
path: pathBeforePop,
});
this.#state = frame.parentState;
const parentFrame = this.#stack.at(-1);
if (parentFrame?.type === "object") {
parentFrame.keyOrIndex = undefined;
}
this.#pos++;
}
/**
* @private
* [FIX] This method is now corrected to use the class field `this.#stringChunks`
* instead of a local variable, allowing it to work across multiple `transform` calls.
*/
#handleInString() {
const start = this.#pos;
while (this.#pos < this.#buffer.length) {
const char = this.#buffer[this.#pos];
if (this.#isEscaped) {
this.#isEscaped = false;
} else if (char === "\\") {
this.#isEscaped = true;
} else if (char === '"') {
const chunk = this.#buffer.substring(start, this.#pos);
if (chunk) {
this.#emitValueChunk(chunk);
this.#stringChunks.push(chunk); // Use class field
}
this.#pos++; // Consume final quote
try {
const finalValue = JSON.parse(`"${this.#stringChunks.join("")}"`);
this.#finalizeValue(finalValue, "string");
} catch (e) {
this.#emitError(`Sequência de string inválida: ${e.message}`);
} finally {
this.#stringChunks = []; // Reset for the next string
}
return;
}
this.#pos++;
}
// String did not end. Store partial chunk.
const chunk = this.#buffer.substring(start, this.#pos);
if (chunk) {
this.#emitValueChunk(chunk);
this.#stringChunks.push(chunk); // Save partial chunk to class field
}
}
/** @private */
#handleSimpleValue(finalType) {
const regex = finalType === "number" ? /[0-9\.eE\+\-]/ : /[a-z]/;
const start = this.#pos;
while (
this.#pos < this.#buffer.length &&
regex.test(this.#buffer[this.#pos])
) {
this.#pos++;
}
const chunk = this.#buffer.substring(start, this.#pos);
if (chunk) this.#valueBuffer += chunk;
if (this.#pos === this.#buffer.length) return; // Wait for more data
try {
const value = JSON.parse(this.#valueBuffer);
this.#finalizeValue(value, finalType);
} catch (e) {
this.#emitError(
`Falha ao parsear valor ${finalType}: '${this.#valueBuffer}'`
);
} finally {
this.#valueBuffer = "";
}
}
/** @private */
#emitValueChunk(chunk) {
const parentFrame = this.#stack.at(-1);
if (
!parentFrame ||
parentFrame.type === "array" ||
parentFrame.keyOrIndex !== undefined
) {
let path;
if (parentFrame?.type === "array") {
const newIndex = (parentFrame.keyOrIndex ?? -1) + 1;
path = [...this.#getCurrentPath(), newIndex];
} else {
path = this.#getCurrentPath();
}
this.#emit({ type: "valueChunk", path, chunk });
}
}
/** @private */
#finalizeValue(value, valueType) {
const parent = this.#stack.at(-1);
if (!parent) {
this.#emit({ type: "value", path: [], value });
this.#state = STATE.DONE;
return;
}
if (parent.type === "object") {
if (parent.keyOrIndex === undefined) {
parent.keyOrIndex = value;
this.#emit({ type: "key", path: this.#getCurrentPath(), value });
this.#state = STATE.IN_OBJECT_AWAITING_COLON;
} else {
this.#emit({ type: "value", path: this.#getCurrentPath(), value });
parent.keyOrIndex = undefined;
this.#state = STATE.IN_OBJECT_AWAITING_SEPARATOR;
}
} else if (parent.type === "array") {
parent.keyOrIndex = (parent.keyOrIndex ?? -1) + 1;
this.#emit({ type: "value", path: this.#getCurrentPath(), value });
this.#state = STATE.IN_ARRAY_AWAITING_SEPARATOR;
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment