Created
August 25, 2018 09:22
-
-
Save tulios/0a6010e51c42560a9ec47c002eb2481a to your computer and use it in GitHub Desktop.
kafkajs snappy codec
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { promisify } = require('util') | |
const snappy = require('snappy') | |
const snappyCompress = promisify(snappy.compress) | |
const snappyDecompress = promisify(snappy.uncompress) | |
const XERIAL_HEADER = Buffer.from([130, 83, 78, 65, 80, 80, 89, 0]) | |
const SIZE_BYTES = 4 | |
const SIZE_OFFSET = 16 | |
const isFrameFormat = buffer => buffer.slice(0, 8).equals(XERIAL_HEADER) | |
module.exports = () => ({ | |
async compress(encoder) { | |
return snappyCompress(encoder.buffer) | |
}, | |
// Based on ttps://github.com/eapache/go-xerial-snappy/blob/master/snappy.go#L110 | |
async decompress(buffer) { | |
if (!isFrameFormat(buffer)) { | |
return snappyDecompress(buffer) | |
} | |
const encoded = [] | |
const maxBytes = Buffer.byteLength(buffer) | |
let offset = SIZE_OFFSET | |
while (offset + SIZE_BYTES <= maxBytes) { | |
const size = buffer.readUInt32BE(offset) | |
offset += SIZE_BYTES | |
encoded.push(buffer.slice(offset, offset + size)) | |
offset += size | |
} | |
const decodedBuffers = await Promise.all( | |
encoded.map(async encodedBuffer => snappyDecompress(encodedBuffer)) | |
) | |
return decodedBuffers.reduce( | |
(result, decodedBuffer) => Buffer.concat([result, decodedBuffer]), | |
Buffer.alloc(0) | |
) | |
}, | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment