Skip to content

Instantly share code, notes, and snippets.

@Phoenix35
Last active July 19, 2022 21:04
Show Gist options
  • Save Phoenix35/5f64f33059c512ccd9c23a2b76eae571 to your computer and use it in GitHub Desktop.
Save Phoenix35/5f64f33059c512ccd9c23a2b76eae571 to your computer and use it in GitHub Desktop.
node.js NDJSON stream decoder
import { Transform } from "node:stream";
import { StringDecoder } from "node:string_decoder";
const EOL = "\n",
EOLLength = EOL.length;
export function createParser (encoding = "utf8") {
return new Transform({
decodeStrings: false,
readableObjectMode: true,
construct (cb) {
this.bufferedData = "";
this._decoder = new StringDecoder(encoding);
cb();
},
transform (chunk, encoding, cb) {
if (encoding === "buffer") {
chunk = this._decoder.write(chunk);
}
for (let lastEOLIndex = -1;;) {
const currEOLIndex = chunk.indexOf(EOL, lastEOLIndex);
if (currEOLIndex === -1) {
this.bufferedData += chunk;
break;
}
try {
const str = this.bufferedData + chunk.slice(lastEOLIndex, currEOLIndex),
data = JSON.parse(str);
this.push(data);
this.bufferedData = "";
} catch (err) {
cb(err);
return;
}
lastEOLIndex = currEOLIndex + EOLLength;
}
cb();
},
final (cb) {
this.bufferedData += this._decoder.end();
if (this.bufferedData.trim() === "") {
cb();
return;
}
try {
this.push(JSON.parse(this.bufferedData));
cb();
} catch (err) {
cb(err);
}
}
});
}
// Use
const parser = createParser();
for await (const data of readableStream.pipe(parser)) {
console.log(data);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment