Last active
January 9, 2024 04:27
-
-
Save breezewish/f061a60f2b0dfacca05d71688d532026 to your computer and use it in GitHub Desktop.
Protobuf.js 6+ Stream Decoder
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import through2 from 'through2'; | |
import { BufferReader } from 'protobufjs'; | |
function decodeProtobuf(decodeFunc, msgMaxSize = 20 * 1000 * 1000) { | |
const buffer = Buffer.alloc(msgMaxSize); | |
const reader = new BufferReader(buffer); | |
let bufferLen = 0; // The length of valid data in buffer, may contain multiple messages | |
reader.pos = 0; | |
let hasError = false; | |
return through2.obj(function (chunk, enc, callback) { | |
if (hasError) { | |
return callback(); | |
} | |
chunk.copy(buffer, bufferLen); | |
bufferLen += chunk.length; | |
while (reader.pos < bufferLen) { | |
// Backup read position in case of decode failures. | |
const pos = reader.pos; | |
let messageLen = 0; | |
try { | |
// Extend reader's view to the whole buffer for decoding prefix. | |
reader.len = bufferLen; | |
messageLen = reader.uint32(); | |
} catch (e) { | |
// Decode length prefix failed, we need to wait for more data. | |
reader.pos = pos; | |
break; | |
} | |
if (reader.pos + messageLen > reader.len) { | |
// No enough data. Rewind & try next time. | |
reader.pos = pos; | |
break; | |
} | |
try { | |
// Shrink reader's view to the current message, otherwise it will try to decode remaining buffer. | |
reader.len = reader.pos + messageLen; | |
const decoded = decodeFunc(reader); | |
this.push(decoded); | |
} catch (e) { | |
// Decode message body failed | |
hasError = true; | |
return callback(e); | |
} | |
} | |
// If some data is successfully decoded.. | |
if (reader.pos > 0) { | |
// If there are remaining data, move to the front. | |
const len = bufferLen - reader.pos; | |
if (len > 0) { | |
buffer.copy(buffer, 0, reader.pos, bufferLen); | |
} | |
bufferLen = len; | |
reader.pos = 0; | |
} | |
callback(); | |
}); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment