Last active
February 18, 2022 04:15
-
-
Save Mr0grog/1f5e14ac360e64304c0031282e0cae3f to your computer and use it in GitHub Desktop.
Test out the performance and memory usage of various ways to compose a gzip of streaming JSON log lines.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Test out the performance and memory usage of various ways to compose a gzip | |
* of streaming JSON log lines. | |
* | |
* One fun property of gzip is that a stream with multiple gzip chunks is | |
* itself valid as a single gzip stream/file. That means there are lots of | |
* interesting ways to break down how you build and stream out gzip data. | |
*/ | |
const fs = require("fs"); | |
const stream = require("stream"); | |
const { pipeline } = require("stream/promises"); | |
const zlib = require('zlib'); | |
const { Buffer } = require('buffer'); | |
const { promisify } = require('util'); | |
const JSONStream = require("JSONStream"); | |
const split2 = require("split2"); | |
const streamChunker = require('stream-chunker'); | |
// A reasonably large input file that matches the type of input we expect. | |
// ~3 GB of ND-JSON data. Most lines are pretty short (~150 bytes), but some | |
// are very long (~250 kB). All lines share the same structure, and lines of | |
// similar length tend to come together in chunks. | |
const inFilePath = "./data/univaf_raw/availability_log-2021-09-21.ndjson"; | |
/** | |
* Simplest implementation: just throw Node's built-in gzip stream in a stream | |
* pipeline after we serialize the JSON. | |
*/ | |
async function compressionSingleStream() { | |
let i = 0; | |
await pipeline( | |
fs.createReadStream(inFilePath), | |
split2(), | |
async function* (source) { | |
for await (const line of source) { | |
if (line) { | |
yield JSON.parse(line); | |
} | |
} | |
}, | |
JSONStream.stringify(false), | |
zlib.createGzip({ level: zlib.constants.Z_BEST_COMPRESSION }), | |
fs.createWriteStream(`${inFilePath}.basicStream.gz`) | |
); | |
} | |
/** | |
* Possibly over-engineered: create a stream that outputs multiple gzips, one | |
* after the other (which together form a valid gzip). There's a lot of messy | |
* work here involved in making sure the whole thing continues to stream without | |
* taking over a huge chunk of memory for the batching. | |
*/ | |
async function compressionBatchSubStream() { | |
class GzipBatchStream extends stream.Duplex { | |
constructor(options) { | |
super({ | |
readableObjectMode: false, | |
writableObjectMode: true | |
}); | |
this.createNewZipper(); | |
this.batchSize = options.batchSize || 10_000; | |
} | |
createNewZipper() { | |
this.inputCount = 0; | |
this.currentZipStream = zlib.createGzip({ level: zlib.constants.Z_BEST_COMPRESSION }); | |
this.currentZipStream.on("data", (chunk) => { | |
if (!this.push(chunk)) { | |
this.currentZipStream.pause(); | |
} | |
}) | |
} | |
_write(chunk, _encoding, callback) { | |
return this.currentZipStream.write(JSON.stringify(chunk) + "\n", (error) => { | |
if (error) { | |
return callback(error); | |
} | |
this.inputCount++; | |
if (this.inputCount === 10_000) { | |
// Don't call back that we're done until the current batch stream has been consumed! | |
this.currentZipStream.once("end", () => { | |
this.createNewZipper(); | |
callback(); | |
}); | |
this.currentZipStream.end(); | |
} else { | |
callback(); | |
} | |
}); | |
} | |
_final(callback) { | |
this.currentZipStream.once("end", () => { | |
this.currentZipStream = null; | |
callback(); | |
}); | |
this.currentZipStream.end(); | |
} | |
_read(_size) { | |
if (this.currentZipStream) { | |
this.currentZipStream.resume(); | |
} | |
} | |
} | |
await pipeline( | |
fs.createReadStream(inFilePath), | |
split2(), | |
async function* (source) { | |
for await (const line of source) { | |
if (line) { | |
yield JSON.parse(line); | |
} | |
} | |
}, | |
new GzipBatchStream({ batchSize: 10_000 }), | |
fs.createWriteStream(`${inFilePath}.batchSubStream.gz`) | |
); | |
} | |
/** | |
* Simple batching: buffer up a large chunk of serialized output and then gzip | |
* the whole chunk and output that on the stream. Like | |
* `compressionBatchSubStream`, this winds up creating an output stream that is | |
* a bunch of gzips one after the other (which together also form a valid gzip.) | |
* This ought to take a lot more memory, and maybe also be slower. We'll see. | |
*/ | |
async function compressionBatchChunks() { | |
let i = 0; | |
await pipeline( | |
fs.createReadStream(inFilePath), | |
split2(), | |
async function* (source) { | |
for await (const line of source) { | |
if (line) { | |
yield JSON.parse(line); | |
} | |
} | |
}, | |
async function* (source) { | |
const gzipPromise = promisify(zlib.gzip); | |
let batch = ""; | |
let batchSize = 0; | |
for await (const row of source) { | |
batchSize++; | |
batch += JSON.stringify(row) + "\n"; | |
if (batchSize === 10_000) { | |
yield await gzipPromise(batch, { level: zlib.constants.Z_BEST_COMPRESSION }); | |
batch = ""; | |
batchSize = 0; | |
} | |
} | |
if (batch.length) { | |
yield await gzipPromise(batch, { level: zlib.constants.Z_BEST_COMPRESSION }); | |
batch = ""; | |
batchSize = 0; | |
} | |
}, | |
fs.createWriteStream(`${inFilePath}.batchChunkStream.gz`) | |
); | |
} | |
/** | |
* Like `compressionBatchChunks`, but separates the batching from the gzipping. | |
* One steam outputs batches of 10,000 lines, and its output is consumed by a | |
* normal gzip stream, rather than the single shot `gzip()` call for each batch. | |
*/ | |
async function compressionBatchChunks2(batchSize) { | |
await pipeline( | |
fs.createReadStream(inFilePath), | |
split2(), | |
async function* (source) { | |
for await (const line of source) { | |
if (line) { | |
yield JSON.parse(line); | |
} | |
} | |
}, | |
async function* (source) { | |
for await (const row of source) { | |
yield JSON.stringify(row) + "\n"; | |
} | |
}, | |
async function* (source) { | |
let buffer = ""; | |
let bufferSize = 0; | |
for await (const row of source) { | |
bufferSize++; | |
buffer += row; | |
if (bufferSize === batchSize) { | |
yield buffer; | |
buffer = ""; | |
bufferSize = 0; | |
} | |
} | |
yield buffer; | |
}, | |
zlib.createGzip({ level: zlib.constants.Z_BEST_COMPRESSION }), | |
fs.createWriteStream(`${inFilePath}.batchChunkStream2.gz`) | |
); | |
} | |
/** | |
* Like `compressionBatchChunks2`, but batches by bytes instead of by line. | |
*/ | |
async function compressionBatchChunks2Bytes(batchSize) { | |
await pipeline( | |
fs.createReadStream(inFilePath), | |
split2(), | |
async function* (source) { | |
for await (const line of source) { | |
if (line) { | |
yield JSON.parse(line); | |
} | |
} | |
}, | |
async function* (source) { | |
for await (const row of source) { | |
yield Buffer.from(JSON.stringify(row) + "\n", "utf8"); | |
} | |
}, | |
async function* (source) { | |
let buffer = Buffer.allocUnsafe(batchSize); | |
let bufferPosition = 0; | |
for await (const input of source) { | |
let inputPosition = 0; | |
while (inputPosition < input.length) { | |
const written = input.copy(buffer, bufferPosition, inputPosition); | |
inputPosition += written; | |
bufferPosition += written; | |
if (bufferPosition === batchSize) { | |
yield buffer; | |
buffer = Buffer.alloc(batchSize); | |
bufferPosition = 0; | |
} | |
} | |
} | |
// Emit any leftovers. | |
if (bufferPosition > 0) { | |
yield buffer.slice(0, bufferPosition); | |
} | |
}, | |
zlib.createGzip({ level: zlib.constants.Z_BEST_COMPRESSION }), | |
fs.createWriteStream(`${inFilePath}.batchChunkStream2Bytes.gz`) | |
); | |
} | |
/** | |
* Like `compressionBatchChunks2Bytes`, but with proper streams instead of | |
* async generators. | |
*/ | |
async function compressionBatchChunks2BytesProper({ batchSize, setHighWaterMark = false, maxMemLevel = false, setChunkSize = false, setGzipHighWaterMark = false }) { | |
// Couldn't find a good version of this on NPM (seems surprising, I'm probably | |
// missing it). But the `stream-chunker` package performs *terribly* (it's | |
// worse than no chunking at all!) | |
class BufferedStream extends stream.Transform { | |
constructor ({ size = 256 * 1024, setHighWaterMark = false } = {}) { | |
const options = {}; | |
if (setHighWaterMark) options.readableHighWaterMark = size; | |
super(options); | |
this.size = size; | |
this.resetBuffer(); | |
} | |
resetBuffer () { | |
this.buffer = Buffer.allocUnsafe(this.size); | |
this.offset = 0; | |
} | |
_transform (input, encoding, callback) { | |
if (typeof input === "string") { | |
input = Buffer.from(input, encoding); | |
} else if (!(input instanceof Buffer)) { | |
callback(new TypeError(`BufferedStream input must be strings or buffers, not ${input.constructor.name}`)); | |
return; | |
} | |
let inputPosition = 0; | |
while (inputPosition < input.length) { | |
const written = input.copy(this.buffer, this.offset, inputPosition); | |
inputPosition += written; | |
this.offset += written; | |
if (this.offset === this.size) { | |
this.push(this.buffer); | |
this.resetBuffer(); | |
} | |
} | |
callback(); | |
} | |
_flush (callback) { | |
if (this.offset > 0) { | |
this.push(this.buffer.slice(0, this.offset)); | |
} | |
callback(); | |
} | |
_destroy (error, callback) { | |
this.buffer = null; | |
callback(error); | |
} | |
} | |
await pipeline( | |
fs.createReadStream(inFilePath), | |
split2(), | |
async function* (source) { | |
for await (const line of source) { | |
if (line) { | |
yield JSON.parse(line); | |
} | |
} | |
}, | |
JSONStream.stringify(false), | |
new BufferedStream({ size: batchSize, setHighWaterMark }), | |
zlib.createGzip({ | |
level: zlib.constants.Z_BEST_COMPRESSION, | |
memLevel: maxMemLevel ? zlib.constants.Z_MAX_LEVEL : zlib.constants.Z_DEFAULT_MEMLEVEL, | |
chunkSize: setChunkSize ? batchSize : undefined, | |
highWaterMark: setGzipHighWaterMark ? batchSize : undefined | |
}), | |
fs.createWriteStream(`${inFilePath}.batchChunkStream2BytesProper.gz`) | |
); | |
} | |
/** | |
* Like `compressionBatchChunks2Bytes`, but with a third-party component | |
* (stream-chunker). | |
*/ | |
async function compressionBatchChunks2Bytes3p(batchSize) { | |
await pipeline( | |
fs.createReadStream(inFilePath), | |
split2(), | |
async function* (source) { | |
for await (const line of source) { | |
if (line) { | |
yield JSON.parse(line); | |
} | |
} | |
}, | |
JSONStream.stringify(false), | |
streamChunker(batchSize, { flush: true, align: false }), | |
zlib.createGzip({ level: zlib.constants.Z_BEST_COMPRESSION }), | |
fs.createWriteStream(`${inFilePath}.batchChunkStream2Bytes3p.gz`) | |
); | |
} | |
async function main() { | |
const sizeArgument = process.argv.find(x => x.startsWith("--size=")); | |
const batchSize = sizeArgument && parseInt(sizeArgument.match(/=(.*)$/)?.[1], 10) || 10_000; | |
console.log("Batch size:", batchSize); | |
const maxMemLevel = process.argv.includes('--max-mem-level'); | |
console.log("maxMemLevel:", maxMemLevel); | |
const setHighWaterMark = process.argv.includes('--set-high-water-mark'); | |
console.log("setHighWaterMark:", setHighWaterMark); | |
const setChunkSize = process.argv.includes('--set-chunk-size'); | |
console.log("setChunkSize:", setChunkSize); | |
const setGzipHighWaterMark = process.argv.includes('--set-gzip-high-water-mark'); | |
console.log("setGzipHighWaterMark:", setGzipHighWaterMark); | |
// Print memory usage every few seconds. This is optional so we can try a few | |
// runs without it and make sure it's not impacting timing. | |
if (process.argv.includes("--show-memory")) { | |
const formatter = new Intl.NumberFormat(); | |
console.log("RSS\tHeap Total\tHeap Used\tExternal\tArrayBuffers") | |
setInterval(() => { | |
const usage = process.memoryUsage(); | |
console.log([ | |
formatter.format(usage.rss).padStart(11, " "), | |
formatter.format(usage.heapTotal).padStart(11, " "), | |
formatter.format(usage.heapUsed).padStart(11, " "), | |
formatter.format(usage.external).padStart(11, " "), | |
formatter.format(usage.arrayBuffers).padStart(11, " "), | |
].join("\t")); | |
}, 5_000).unref(); | |
} | |
if (process.argv.includes('single-stream')) { | |
await compressionSingleStream(); | |
} | |
if (process.argv.includes('batch-sub-stream')) { | |
await compressionBatchSubStream(); | |
} | |
if (process.argv.includes('batch-chunk-stream')) { | |
await compressionBatchChunks(); | |
} | |
if (process.argv.includes('batch-chunk-stream-2')) { | |
await compressionBatchChunks2(batchSize); | |
} | |
if (process.argv.includes('batch-chunk-stream-2-bytes')) { | |
await compressionBatchChunks2Bytes(batchSize); | |
} | |
if (process.argv.includes('batch-chunk-stream-2-bytes-proper')) { | |
await compressionBatchChunks2BytesProper({ | |
batchSize, | |
maxMemLevel, | |
setHighWaterMark, | |
setChunkSize, | |
setGzipHighWaterMark | |
}); | |
} | |
if (process.argv.includes('batch-chunk-stream-2-bytes-3p')) { | |
await compressionBatchChunks2Bytes3p(batchSize); | |
} | |
} | |
main().catch(error => { | |
console.log(error); | |
process.exitCode = 1; | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I came back again for one more round of testing on this.
Since
compressionBatchChunks2
taught me that we really don’t need to do any chunking of the resulting gzips to get low-memory streaming and compression, it stands to reason that memory usage could be more optimized if we batch by bytes instead of by number of lines. The added tests here look at that, and (unsurprisingly) it improves memory usage in general and also makes the range between minimum and maximum memory usage much smaller — the memory footprint stays generally consistent.In the new code, I’ve added 3 methods:
compressionBatchChunks2Bytes
works similarly tocompressionBatchChunks2
, except it creates buffers of N bytes instead of strings of N lines.compressionBatchChunks2BytesProper
is the same as the above, but the batching code is written as a transform stream instead of as an async generator. I was curious whether the async generators supported bystream.pipeline()
imposed any extra overhead (or maybe even had less overhead) compared to normal streams, but writing the code as a stream also allowed me to test some optimizations around high water marks (which you can’t set for the generator). Those optimizations are:highWaterMark
on the batching stream to match the size of the batches. This had no notable impact. (On the other hand, I can imagine how this might make a difference is data is coming into the batching stream more slowly.)memLevel
on the gzip stream to the maximum value (that is,9
instead of8
). This maybe has some minor speed improvements when the batch size is 64 kB – 128 kB, but it seems as likely that this is just an artifact of the particular data file I was testing with. (I don’t think it was not doing enough runs to average out random jitter, since the same pattern repeats in the next test.)highWaterMark
on the batching stream andmemLevel
on the gzip stream. This performs basically the same as just usingmemLevel
.chunkSize
to match the batch size (the default is 16 kB) in addition to the above optimizations. This had a small but consistent speed improvement for batch sizes above 64 kB. In reality, I think what’s happening here is that thehighWaterMark
for filesystem streams is 64 kB, so this is really more about how few reads are needed to fill the next stream’s buffer, rather than anything about gzipping. In any case, this does mean that optimizing the gzip stream for the high water mark of the next stream can have a pretty big impact on overall performance, which is good to keep in mind.highWaterMark
on the gzip stream in addition to the previous optimization. This had no notable impact.It’s worth noting that none of the above optimizations made a consistent or notable impact on memory usage.
One other minor advantage here is that the async generators don’t work with the old-style stream created by
JSONStream
, while an actual stream object does. This isn’t a huge deal, though.compressionBatchChunks2Bytes3p
uses thestream-chunker
package to batch up the data instead of custom code. It turns out to be incredibly inefficient, and actually makes the whole pipeline perform worse than anything else.Overall, these approaches (excepting
compressionBatchChunks2Bytes3p
, which was just bad all around) improved both memory usage and time. I didn’t have a whole lot of time to dig into this, but I’m guessing most of the speed improvement between buffering by line and buffering by bytes is down doing more string operations in JS, which would generally be a bit more expensive that operating on bytes in buffers. It could also be that I made a poor estimation of average bytes per line and the comparison isn’t as comparable as I’d hope it to be. In any case, the memory improvements between them seem more obviously clear.One interesting thing here is that user time tends to be pretty consistent between the different optimizations, so what the optimizations are affecting is likely to be mainly about a) how well zlib is able to spread its work across cores without bottlenecking and b) how much time is spent shuttling memory/data around between JS and zlib.
In most cases, performance seems pretty consistent with chunk sizes above 32 kB, and progressively slows down as the chunks get smaller. This makes sense, since the default
windowBits
for zlib is 15, which equates to a 32 kB data window to work with (note: the default is also the max; this can’t be made bigger). So as soon as the batches get smaller than that, we start wasting time waiting for data to fill zlib’s internal working buffer and things really to crawl.Finally, memory usage between all the different byte buffering approaches here was pretty consistent, so I only included numbers from one of the methods in the tables below.
Overall Lessons Here
Gzip streams can work over large streams of data pretty efficiently and really don’t need much memory at all. You don’t need to manually break up your data into multiple gzip blocks to get efficient streaming output. (I’m not sure if this is due to an implementation change from the early days, if I was just remembering something incorrectly, or if I’d never rigorously tested what I’d read about this way back early on. 🤷 )
Batching data before it arrives at a gzip stream can massively improve both gzip speed and memory usage. Unless the timing of each byte on your stream is very inconsistent and very slow, you should probably always have a stream batch up data before piping/writing it to a gzip stream.
Batches should ideally be >= the window size you are using in zlib (by default, this is 32 kB). You can calculate this as
batchSizeBytes = 1 << windowBits
.Setting zlib’s
memLevel
to the max did not make an appreciable difference on the data set I was working with.Setting the high water mark on a zlib stream doesn’t make any noticeable difference.
When possible, matching a zlib stream’s
chunkSize
to thehighWaterMark
of whatever stream is reading from it can give a small but consistent speed boost.Some Tables with Performance Measurements
All data comes from samples across 4 runs of of each method/optimization at each batch size. The timings in the table below are averages across all four runs.
-This is the memory usage from all 4 runs of
compressionBatchChunks2BytesProper
with all the optimizations on. It’s not appreciably different from any other combination of optimizations or fromcompressionBatchChunks2Bytes
.