Skip to content

Instantly share code, notes, and snippets.

@Xunnamius
Last active April 28, 2023 10:10
Show Gist options
  • Save Xunnamius/9b78aad3fb975a07a78efee4f8021ba3 to your computer and use it in GitHub Desktop.
Save Xunnamius/9b78aad3fb975a07a78efee4f8021ba3 to your computer and use it in GitHub Desktop.
Playing around with node streams and the stream-json package
/* eslint-disable no-console */
import { setTimeout as delay } from 'node:timers/promises';
import { Readable, Writable } from 'node:stream';
import { createWriteStream, renameSync } from 'node:fs';
import { writeFile } from 'node:fs/promises';
import { randomBytes } from 'node:crypto';
import { default as StreamFork } from 'stream-fork';
import { default as StreamChain } from 'stream-chain';
import { default as StreamJson } from 'stream-json';
import { default as StreamJsonDisassembler } from 'stream-json/Disassembler.js';
import { default as StreamJsonStringer } from 'stream-json/Stringer.js';
import { default as StreamJsonStreamValues } from 'stream-json/streamers/StreamValues.js';
import { default as StreamJsonStreamArray } from 'stream-json/streamers/StreamArray.js';
import { default as StreamJsonStreamObject } from 'stream-json/streamers/StreamObject.js';
import { default as StreamJsonPick } from 'stream-json/filters/Pick.js';
import { default as StreamJsonIgnore } from 'stream-json/filters/Ignore.js';
import { default as StreamJsonReplace } from 'stream-json/filters/Replace.js';
import { default as StreamJsonBatch } from 'stream-json/utils/Batch.js';
const { chain } = StreamChain;
const { parser } = StreamJson;
const { disassembler } = StreamJsonDisassembler;
const { stringer } = StreamJsonStringer;
const { streamValues } = StreamJsonStreamValues;
const { streamArray } = StreamJsonStreamArray;
const { streamObject } = StreamJsonStreamObject;
const { pick } = StreamJsonPick;
const { ignore } = StreamJsonIgnore;
const { replace } = StreamJsonReplace;
const { batch } = StreamJsonBatch;
const { fork } = StreamFork;
const useOnlyChainedStreams = true;
const collectOutput = false;
const useLogStream = false;
const numberObjToGenerate = 5;
// * (use with) B, D
/**
* @type {StreamJsonStringer.StringerOptions}
*/
const stringerOptions = {
makeArray: true,
useValues: true,
useStringValues: false
};
// * (use with) C
/**
* @type {StreamJsonStringer.StringerOptions}
*/
// const stringerOptions = {
// makeArray: true
// };
// * A (useLogStream=true)
// * Without stream-json
// const chainedStreams = [(x) => ({ ...x, a: `changed! (was ${x.a})` })];
// * B
// * For when string values might be too big for memory
// const chainedStreams = [
// disassembler({
// streamValues: false,
// streamStrings: true,
// packValues: true,
// packStrings: false
// }),
// // ? Ignore filter needs packed keys
// ignore({ filter: 'c' })
// ];
// * C
// * Filtering on individual object prop values when objects can fit in memory
// const chainedStreams = [
// disassembler(),
// ignore({ filter: 'c' }),
// // ? Streamers need packed values
// streamValues(),
// (data) => (data.value.index % 2 === 1 ? data.value : null),
// disassembler()
// ];
const packedValuesMap = new Map();
const packProperty = function ({ name, value }, property) {
let [isPacking, isPacked, packedValue] = packedValuesMap.get(property) || [
false,
false,
''
];
if (!isPacked) {
if (name === 'keyValue' && value === property) {
isPacking = true;
} else if (isPacking && name.endsWith('Chunk')) {
packedValue += value;
} else if (isPacking && name.startsWith('end')) {
isPacking = false;
isPacked = true;
}
}
packedValuesMap.set(property, [isPacking, isPacked, packedValue]);
return [isPacked, isPacked ? packedValue : undefined];
};
// * D (useOnlyChainedStreams=true)
// * Filtering on individual object prop values without loading strings into mem
const chainedStreams = [
disassembler({
streamValues: false,
streamStrings: true,
packValues: true,
packStrings: false
}),
fork([
chain([
ignore({ filter: 'c' }),
stringer(stringerOptions),
createWriteStream('ignore.output.json')
]),
chain([
pick({ filter: 'c' }),
async function (data) {
const getRealFilename = (packedId, packedType) => {
return `ignore.attachment-${packedId}.${packedType}`;
};
const [idIsPacked, packedId] = packProperty(data, 'id');
const [typeIsPacked, packedType] = packProperty(data, 'type');
if (data.name === 'keyValue' && data.value === 'bytes') {
this.outputStream =
idIsPacked && typeIsPacked
? createWriteStream(getRealFilename(packedId, packedType))
: createWriteStream(
'ignore.tmp-' + randomBytes(4).readUInt32LE(0).toString(36)
);
this.outputStream.on('error', (error) => pipeline.destroy(error));
} else if (this.outputStream) {
if (this.outputStream.writable) {
if (data.name === 'endString') {
this.outputStream.end();
} else if (data.name === 'stringChunk') {
await new Promise((resolve) => {
if (!this.outputStream.write(data.value)) {
this.outputStream.once('drain', () => resolve());
} else {
resolve();
}
});
}
} else if (idIsPacked && typeIsPacked) {
const path = this.outputStream.path;
const finalizeCommit = () => {
renameSync(path, `./ignore.attachment-${packedId}.${packedType}`);
};
if (!path.endsWith(getRealFilename(packedId, packedType))) {
if (this.outputStream.closed) {
finalizeCommit();
} else {
this.outputStream.on('close', () => finalizeCommit());
}
}
packedValuesMap.clear();
this.outputStream = undefined;
}
}
}.bind({})
])
])
];
function makeObjectGeneratorStream(numberObjToGenerate = 10) {
async function* makeObjectGenerator(numberObjToGenerate) {
for (let index = 0; index < numberObjToGenerate; index++) {
// eslint-disable-next-line no-await-in-loop
await delay(250);
yield index % 2 === 0
? {
a: 1,
b: 'data',
c: {
id: (Math.random() + 1).toString(36).slice(7),
type: 'txt',
bytes: 'huge-big-data-file'
},
index
}
: {
index,
a: 'one',
c: {
bytes: 'huge-big-data-file',
type: 'md',
id: (Math.random() + 1).toString(36).slice(7)
},
b: 'zata'
};
}
}
const objectGenerator = makeObjectGenerator(numberObjToGenerate);
return new Readable({
objectMode: true,
async read() {
const { value, done } = await objectGenerator.next();
this.push(done ? null : value);
}
});
}
function makeLogStream(reduce = false) {
let sawNonString = false;
const chunks = [];
return new Writable({
objectMode: true,
write(chunk, encoding, callback) {
if (reduce) {
chunks.push(chunk);
sawNonString ||= typeof chunk !== 'string';
console.log(`(collected) chunk (${encoding}):`, chunk);
} else {
console.log(`chunk (${encoding}):`, chunk);
}
callback(null);
},
final(callback) {
if (reduce) {
console.log(
`total output:\n${
sawNonString ? JSON.stringify(chunks, undefined, 2) : chunks.join('')
}`
);
}
callback(null);
}
});
}
console.log('stream test started');
const pipeline = useOnlyChainedStreams
? chain(chainedStreams)
: chain([
...chainedStreams,
...(useLogStream
? [makeLogStream(collectOutput)]
: [stringer(stringerOptions), createWriteStream('ignore.output.json')])
]);
pipeline.on('error', (error) => console.error('final error handler:', error));
makeObjectGeneratorStream(numberObjToGenerate).pipe(pipeline);
pipeline.on('end', () => console.log('stream test ended'));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment