Skip to content

Instantly share code, notes, and snippets.

@qwtel
Created July 11, 2024 05:43
Show Gist options
  • Save qwtel/5fd74f1fa827e12527ee918ecfc926f8 to your computer and use it in GitHub Desktop.
Save qwtel/5fd74f1fa827e12527ee918ecfc926f8 to your computer and use it in GitHub Desktop.
// import { TransformStream } from 'node:stream/web';
import { Buffer } from 'node:buffer';
import { Packr, Unpackr, Options } from 'msgpackr';
export class PackrTransformStream extends TransformStream<any, Uint8Array> {
constructor(options: Options & { packr?: Packr } = {}) {
options = { ...options, sequential: true };
const packr = options.packr || new Packr(options);
super({
transform(chunk, controller) {
controller.enqueue(packr.pack(chunk));
}
});
}
}
export class UnpackrTransformStream extends TransformStream<Uint8Array, any> {
constructor(options: Options & { unpackr?: Unpackr } = {}) {
options = { ...options, structures: [] };
const unpackr = options.unpackr || new Unpackr(options);
let incompleteBuffer: Uint8Array|null = null;
super({
transform(chunk, controller) {
if (incompleteBuffer) {
chunk = Buffer.concat([incompleteBuffer, chunk]);
incompleteBuffer = null;
}
let values;
try {
values = unpackr.unpackMultiple(chunk);
} catch (err) {
const error = err as { incomplete: boolean, lastPosition: number, values: any[] };
if (error.incomplete) {
incompleteBuffer = chunk.subarray(error.lastPosition);
values = error.values;
} else {
controller.error(error);
}
} finally {
if (values) {
for (const value of values) {
controller.enqueue(value);
}
}
}
}
});
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment