Last active
April 28, 2023 10:10
-
-
Save Xunnamius/9b78aad3fb975a07a78efee4f8021ba3 to your computer and use it in GitHub Desktop.
Playing around with node streams and the stream-json package
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* eslint-disable no-console */ | |
import { setTimeout as delay } from 'node:timers/promises'; | |
import { Readable, Writable } from 'node:stream'; | |
import { createWriteStream, renameSync } from 'node:fs'; | |
import { writeFile } from 'node:fs/promises'; | |
import { randomBytes } from 'node:crypto'; | |
import { default as StreamFork } from 'stream-fork'; | |
import { default as StreamChain } from 'stream-chain'; | |
import { default as StreamJson } from 'stream-json'; | |
import { default as StreamJsonDisassembler } from 'stream-json/Disassembler.js'; | |
import { default as StreamJsonStringer } from 'stream-json/Stringer.js'; | |
import { default as StreamJsonStreamValues } from 'stream-json/streamers/StreamValues.js'; | |
import { default as StreamJsonStreamArray } from 'stream-json/streamers/StreamArray.js'; | |
import { default as StreamJsonStreamObject } from 'stream-json/streamers/StreamObject.js'; | |
import { default as StreamJsonPick } from 'stream-json/filters/Pick.js'; | |
import { default as StreamJsonIgnore } from 'stream-json/filters/Ignore.js'; | |
import { default as StreamJsonReplace } from 'stream-json/filters/Replace.js'; | |
import { default as StreamJsonBatch } from 'stream-json/utils/Batch.js'; | |
const { chain } = StreamChain; | |
const { parser } = StreamJson; | |
const { disassembler } = StreamJsonDisassembler; | |
const { stringer } = StreamJsonStringer; | |
const { streamValues } = StreamJsonStreamValues; | |
const { streamArray } = StreamJsonStreamArray; | |
const { streamObject } = StreamJsonStreamObject; | |
const { pick } = StreamJsonPick; | |
const { ignore } = StreamJsonIgnore; | |
const { replace } = StreamJsonReplace; | |
const { batch } = StreamJsonBatch; | |
const { fork } = StreamFork; | |
const useOnlyChainedStreams = true; | |
const collectOutput = false; | |
const useLogStream = false; | |
const numberObjToGenerate = 5; | |
// * (use with) B, D | |
/** | |
* @type {StreamJsonStringer.StringerOptions} | |
*/ | |
const stringerOptions = { | |
makeArray: true, | |
useValues: true, | |
useStringValues: false | |
}; | |
// * (use with) C | |
/** | |
* @type {StreamJsonStringer.StringerOptions} | |
*/ | |
// const stringerOptions = { | |
// makeArray: true | |
// }; | |
// * A (useLogStream=true) | |
// * Without stream-json | |
// const chainedStreams = [(x) => ({ ...x, a: `changed! (was ${x.a})` })]; | |
// * B | |
// * For when string values might be too big for memory | |
// const chainedStreams = [ | |
// disassembler({ | |
// streamValues: false, | |
// streamStrings: true, | |
// packValues: true, | |
// packStrings: false | |
// }), | |
// // ? Ignore filter needs packed keys | |
// ignore({ filter: 'c' }) | |
// ]; | |
// * C | |
// * Filtering on individual object prop values when objects can fit in memory | |
// const chainedStreams = [ | |
// disassembler(), | |
// ignore({ filter: 'c' }), | |
// // ? Streamers need packed values | |
// streamValues(), | |
// (data) => (data.value.index % 2 === 1 ? data.value : null), | |
// disassembler() | |
// ]; | |
const packedValuesMap = new Map(); | |
const packProperty = function ({ name, value }, property) { | |
let [isPacking, isPacked, packedValue] = packedValuesMap.get(property) || [ | |
false, | |
false, | |
'' | |
]; | |
if (!isPacked) { | |
if (name === 'keyValue' && value === property) { | |
isPacking = true; | |
} else if (isPacking && name.endsWith('Chunk')) { | |
packedValue += value; | |
} else if (isPacking && name.startsWith('end')) { | |
isPacking = false; | |
isPacked = true; | |
} | |
} | |
packedValuesMap.set(property, [isPacking, isPacked, packedValue]); | |
return [isPacked, isPacked ? packedValue : undefined]; | |
}; | |
// * D (useOnlyChainedStreams=true) | |
// * Filtering on individual object prop values without loading strings into mem | |
const chainedStreams = [ | |
disassembler({ | |
streamValues: false, | |
streamStrings: true, | |
packValues: true, | |
packStrings: false | |
}), | |
fork([ | |
chain([ | |
ignore({ filter: 'c' }), | |
stringer(stringerOptions), | |
createWriteStream('ignore.output.json') | |
]), | |
chain([ | |
pick({ filter: 'c' }), | |
async function (data) { | |
const getRealFilename = (packedId, packedType) => { | |
return `ignore.attachment-${packedId}.${packedType}`; | |
}; | |
const [idIsPacked, packedId] = packProperty(data, 'id'); | |
const [typeIsPacked, packedType] = packProperty(data, 'type'); | |
if (data.name === 'keyValue' && data.value === 'bytes') { | |
this.outputStream = | |
idIsPacked && typeIsPacked | |
? createWriteStream(getRealFilename(packedId, packedType)) | |
: createWriteStream( | |
'ignore.tmp-' + randomBytes(4).readUInt32LE(0).toString(36) | |
); | |
this.outputStream.on('error', (error) => pipeline.destroy(error)); | |
} else if (this.outputStream) { | |
if (this.outputStream.writable) { | |
if (data.name === 'endString') { | |
this.outputStream.end(); | |
} else if (data.name === 'stringChunk') { | |
await new Promise((resolve) => { | |
if (!this.outputStream.write(data.value)) { | |
this.outputStream.once('drain', () => resolve()); | |
} else { | |
resolve(); | |
} | |
}); | |
} | |
} else if (idIsPacked && typeIsPacked) { | |
const path = this.outputStream.path; | |
const finalizeCommit = () => { | |
renameSync(path, `./ignore.attachment-${packedId}.${packedType}`); | |
}; | |
if (!path.endsWith(getRealFilename(packedId, packedType))) { | |
if (this.outputStream.closed) { | |
finalizeCommit(); | |
} else { | |
this.outputStream.on('close', () => finalizeCommit()); | |
} | |
} | |
packedValuesMap.clear(); | |
this.outputStream = undefined; | |
} | |
} | |
}.bind({}) | |
]) | |
]) | |
]; | |
function makeObjectGeneratorStream(numberObjToGenerate = 10) { | |
async function* makeObjectGenerator(numberObjToGenerate) { | |
for (let index = 0; index < numberObjToGenerate; index++) { | |
// eslint-disable-next-line no-await-in-loop | |
await delay(250); | |
yield index % 2 === 0 | |
? { | |
a: 1, | |
b: 'data', | |
c: { | |
id: (Math.random() + 1).toString(36).slice(7), | |
type: 'txt', | |
bytes: 'huge-big-data-file' | |
}, | |
index | |
} | |
: { | |
index, | |
a: 'one', | |
c: { | |
bytes: 'huge-big-data-file', | |
type: 'md', | |
id: (Math.random() + 1).toString(36).slice(7) | |
}, | |
b: 'zata' | |
}; | |
} | |
} | |
const objectGenerator = makeObjectGenerator(numberObjToGenerate); | |
return new Readable({ | |
objectMode: true, | |
async read() { | |
const { value, done } = await objectGenerator.next(); | |
this.push(done ? null : value); | |
} | |
}); | |
} | |
function makeLogStream(reduce = false) { | |
let sawNonString = false; | |
const chunks = []; | |
return new Writable({ | |
objectMode: true, | |
write(chunk, encoding, callback) { | |
if (reduce) { | |
chunks.push(chunk); | |
sawNonString ||= typeof chunk !== 'string'; | |
console.log(`(collected) chunk (${encoding}):`, chunk); | |
} else { | |
console.log(`chunk (${encoding}):`, chunk); | |
} | |
callback(null); | |
}, | |
final(callback) { | |
if (reduce) { | |
console.log( | |
`total output:\n${ | |
sawNonString ? JSON.stringify(chunks, undefined, 2) : chunks.join('') | |
}` | |
); | |
} | |
callback(null); | |
} | |
}); | |
} | |
console.log('stream test started'); | |
const pipeline = useOnlyChainedStreams | |
? chain(chainedStreams) | |
: chain([ | |
...chainedStreams, | |
...(useLogStream | |
? [makeLogStream(collectOutput)] | |
: [stringer(stringerOptions), createWriteStream('ignore.output.json')]) | |
]); | |
pipeline.on('error', (error) => console.error('final error handler:', error)); | |
makeObjectGeneratorStream(numberObjToGenerate).pipe(pipeline); | |
pipeline.on('end', () => console.log('stream test ended')); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment