Skip to content

Instantly share code, notes, and snippets.

@foxt
Last active April 2, 2025 13:12
Show Gist options
  • Save foxt/94f217926c9966db4e269eaa6d825301 to your computer and use it in GitHub Desktop.
Save foxt/94f217926c9966db4e269eaa6d825301 to your computer and use it in GitHub Desktop.
Streaming CSV parser using the Web Streams API
/* eslint-disable @typescript-eslint/no-non-null-assertion */
class _CsvDecodeStream implements Transformer<any, Record<string, string | number>[]> {
line: (string | number)[] = [];
buffer = "";
isQuoted = false;
wasQuote = false;
constructor(private withHeaders = true) {}
headers: typeof this.line | null = null;
sendLine(controller: TransformStreamDefaultController) {
if (!this.withHeaders) {
controller.enqueue(this.line);
} else {
if (this.headers === null) {
this.headers = this.line;
} else {
controller.enqueue(Object.fromEntries(this.line.map((v, i) => [this.headers![i] || i, v])));
}
}
this.line = [];
}
addBuffer() {
const t = this.buffer.trim();
const p = parseFloat(t);
if (!isNaN(p) && t === p.toString())
this.line.push(p);
else
this.line.push(t);
this.buffer = "";
}
private parseChar(char: string, controller: TransformStreamDefaultController) {
// if we see a quote, we need to check if it's a quote or a double quote escape
// first quote
if (this.isQuoted && char === '"' && !this.wasQuote) {
this.wasQuote = true;
return;
// second quote
} else if (this.isQuoted && char === '"' && this.wasQuote) {
this.buffer += '"';
this.wasQuote = false;
return;
}
// enter/leave quoted mode
if (char === '"') {
this.isQuoted = !this.isQuoted;
return;
} else if (this.isQuoted && this.wasQuote) {
this.isQuoted = false;
}
this.wasQuote = false;
// if we're not in quoted mode, we need to check if we're at the end of a field
if (!this.isQuoted && char === ',') {
this.addBuffer();
return;
}
// if we're not in quoted mode and we're at the end of a line
if (!this.isQuoted && char === '\n') {
this.addBuffer();
this.sendLine(controller);
return;
}
this.buffer += char;
}
td = new TextDecoder();
transform(chunk: any, controller: TransformStreamDefaultController) {
let data;
if (typeof chunk == "string") data = chunk;
else if (typeof Buffer !== 'undefined' && chunk instanceof Buffer) data = chunk.toString();
else if (chunk instanceof ArrayBuffer) data = this.td.decode(chunk, {stream:true});
else if (chunk instanceof Uint8Array) data = this.td.decode(chunk, {stream:true});
else throw new Error("Unsupported chunk type: " + typeof chunk);
for (let i = 0; i < data.length; i++)
this.parseChar(data[i], controller);
}
flush(controller: TransformStreamDefaultController) {
if (this.buffer.length > 0)
this.line.push(this.buffer);
if (this.line.length > 0)
this.sendLine(controller);
}
}
export function CsvDecodeStream(withHeaders = true) {
return new TransformStream(new _CsvDecodeStream(withHeaders));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment