Created
July 6, 2021 21:15
-
-
Save sandfox/656a17aa0e241d046ac593fc4ad07d97 to your computer and use it in GitHub Desktop.
nodejs newline stream
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { Transform } = require("stream"); | |
// Splits buffers on utf8 newline character | |
// If the maxLineLength is passed the current line is discarded and all further input is discarded until the next newline char is found | |
// TODO: emit errors on overlong lines? | |
class NewlineStreamSplitter extends Transform { | |
constructor(streamOpts, opts) { | |
super(streamOpts); | |
opts = opts || {}; | |
// this default aligns with cloudwatch logs max size for a single event | |
this._maxLineLengthBytes = opts.maxLineLengthBytes || 256 * 1024; | |
this._buffers = []; | |
this._currentSize = 0; | |
} | |
_isLimitExceeded() { | |
return this._currentSize > this._maxLineLengthBytes; | |
} | |
_transform(chunk, encoding, callback) { | |
if (encoding !== "buffer") { | |
callback(new Error("This only supports buffer mode streams")); | |
return; | |
} | |
let marker = 0; | |
while (marker < chunk.length - 1) { | |
// find the newline byte | |
const idx = chunk.indexOf("\n", marker, "utf8"); | |
// no newline found | |
if (idx === -1) { | |
const subChunk = chunk.slice(marker); | |
this._currentSize += subChunk.length; | |
// only store the buffer if we aren't over the limit | |
if (!this._isLimitExceeded()) { | |
this._buffers.push(subChunk); | |
} | |
break; | |
} | |
// newline found | |
const subChunk = chunk.slice(marker, idx); | |
this._buffers.push(subChunk); | |
this._currentSize += subChunk.length; | |
marker = idx + 1; | |
// if it's within limits - push it downstream, else reset internal state | |
if (!this._isLimitExceeded()) { | |
this._flushBuffer(); | |
} else if (this._isLimitExceeded()) { | |
this._clear(); | |
} | |
} | |
callback(); | |
} | |
_flush(cb) { | |
if (!this._isLimitExceeded()) { | |
this._flushBuffer(); | |
} else { | |
this._clear(); | |
} | |
cb(); | |
} | |
// drop all internal state | |
_clear() { | |
this._buffers = []; | |
this._currentSize = 0; | |
} | |
_flushBuffer() { | |
this.push(Buffer.concat(this._buffers)); | |
this._clear(); | |
} | |
} | |
module.exports = { | |
NewlineStreamSplitter, | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const assert = require("assert"); | |
const { NewlineStreamSplitter } = require("./newline-stream-splitter"); | |
const testSplitter = () => { | |
const testLine = Buffer.from("some-data", "utf8"); | |
const nl = Buffer.from("\n", "utf8"); | |
const testBuffer = Buffer.concat([testLine, nl, testLine]); | |
const splitter = new NewlineStreamSplitter(undefined, { | |
maxLineLengthBytes: 20, | |
}); | |
const outputChunks = []; | |
splitter.on("data", (b) => { | |
outputChunks.push(b); | |
}); | |
splitter.write(testBuffer); | |
splitter.end(); | |
outputChunks.forEach((c, i) => { | |
assert.ok( | |
Buffer.compare(c, testLine) === 0, | |
`expected chunk@${i} to equal "some-data" buffer` | |
); | |
}); | |
assert.strictEqual(outputChunks.length, 2, "output should have 2 chunks"); | |
}; | |
testSplitter(); | |
const testSkipsOverlengthLines = () => { | |
const testLine = Buffer.from( | |
"oh noe I am too long \n i am also too long", | |
"utf8" | |
); | |
const splitter = new NewlineStreamSplitter(undefined, { | |
maxLineLengthBytes: 4, | |
}); | |
splitter.on("data", () => { | |
assert.fail("no buffers should be emitted"); | |
}); | |
splitter.write(testLine); | |
splitter.end(); | |
}; | |
testSkipsOverlengthLines(); | |
const testSkipsOnlyOverLengthLines = () => { | |
const testLine = Buffer.from("oh noe I am too long \n----", "utf8"); | |
const splitter = new NewlineStreamSplitter(undefined, { | |
maxLineLengthBytes: 4, | |
}); | |
const outputChunks = []; | |
splitter.on("data", (b) => { | |
outputChunks.push(b); | |
}); | |
splitter.write(testLine); | |
splitter.end(); | |
outputChunks.forEach((c) => { | |
assert.ok( | |
Buffer.compare(c, Buffer.from("----", "utf8")) === 0, | |
`expected buffer to equal "----" buffer` | |
); | |
}); | |
assert.strictEqual(outputChunks.length, 1, "output should have 1 chunks"); | |
}; | |
testSkipsOnlyOverLengthLines(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment