Skip to content

Instantly share code, notes, and snippets.

@tomfun
Last active March 8, 2025 21:44
Show Gist options
  • Save tomfun/14563c086b3e0ebeb3803fb3340d31ab to your computer and use it in GitHub Desktop.
Save tomfun/14563c086b3e0ebeb3803fb3340d31ab to your computer and use it in GitHub Desktop.
Node js stdin.on('data', ...) in action testing

How to test application stream

How to test node streams in node using linux tools and scripts

The easiest test:

echo -e "line1\nline2\nline3" | node pause-test.js

Rate limit with pv:

The easiest test:

echo -e "line1\nline2\nline3" | pv -L 10 | node pause-test.js

Send a file after text:

echo -e "line1\nline2\nline3" | cat - .nvmrc | node pause-test.js

Write a script with sleep and commands:

{                   # Start a command block
  echo -e "line1\nline2\nline3" &&  # Print three lines
  sleep 1 &&          # Wait for 1 second
  echo 'Bye!'      # Print "Bye!"
} | node pause-test.js

Enable bufferring:

stdbuf --output=10 bash -c '
  echo -e "line1\nline2\nwait for creaapy random non readable chars\n\n" | pv -L 10 -tb -N " Print three lines " -i 30;  # Print three lines but rate limited
  sleep 1;
  dd if=/dev/urandom bs=3 count=40 2>/dev/null | pv -L 30 -tb -N " Random chars " -i 30;
  sleep 1;
  echo -e "\n\n..end of random bytes\nBye"
' | node pause-test.js

With pipe (fifo)

mkfifo stdin
node pause-test.js < stdin

In another terminal

cat - > stdin

Links

stdbuf - Run COMMAND, with modified buffering operations for its

dd - convert and copy a file standard streams.

mkfifo - make a FIFO special file with name pathname. mode specifies the FIFO's permissions.

pv - monitor the progress of data through a pipe

#!/usr/bin/env node
const fs = require('fs')
const LINES = 2
let partialBuffer = Buffer.alloc(0)
let wantClose = false
let exiting = false
process.stdin.on('data', (chunk) => {
const hrtime = process.hrtime()
let start = 0
let newlineIndex
process.stderr.write('ch' + chunk.length + '\n')
// Use Buffer.indexOf to find LF (10). No explicit loop over each byte.
while ((newlineIndex = chunk.indexOf(10, start)) !== -1) {
// Slice out the chunk for this line.
let lineBuffer = chunk.slice(start, newlineIndex)
// Remove a trailing CR (13) if present.
if (lineBuffer.length && lineBuffer[lineBuffer.length - 1] === 13) {
lineBuffer = lineBuffer.slice(0, lineBuffer.length - 1)
}
// If a partial line was buffered from a previous chunk, prepend it.
if (partialBuffer.length) {
lineBuffer = Buffer.concat([partialBuffer, lineBuffer])
partialBuffer = Buffer.alloc(0)
}
// Update the hash by concatenating the previous hash buffer with the current line buffer.
process.stderr.write('nl: ' + lineBuffer.toString('utf8') + '\n')
start = newlineIndex + 1
}
// Buffer any remaining data that didn't end with a newline.
if (start < chunk.length) {
partialBuffer = Buffer.concat([partialBuffer, chunk.slice(start)])
process.stderr.write('partial: ' + partialBuffer.toString('utf8') + '\n')
}
})
// logger.on('close', onClose)
setTimeout(() => {
process.stderr.write(' 12s ! simulate wantClose by error !\n')
wantClose = true
}, 12000)
function cleanExit() {
exiting = true
let sending = false
setTimeout(() => {
console.warn('simulate sending')
sending = false
}, 3000)
const hrtime = process.hrtime()
let waited = 0
const rr = setInterval(() => {
waited++
// todo:
// if (process.exitCode === 0) {
// process.exitCode = 1
// }
if (partialBuffer.length && waited < 10) {
if (waited === 9) {
console.error({
streamEnd: 'Stream ended unexpectedly',
hrtime: hrtime,
partialBufferPart: partialBuffer.toString('utf8').slice(0, 4000),
})
}
console.debug('wait for new data. stream corrupted')
} else if (sending) {
console.debug('wait for data to be logged')
} else {
clearInterval(rr)
process.exitCode = 0
}
}, 1000)
}
process.on('exit', cleanExit)
process.stdin.on('end', (e) => {
console.warn('end', e, 'call onExit')
cleanExit()
})
process.stdin.on('error', (err) => {
hrtime = process.hrtime()
console.error('Stream error:', err, 'WHAT TO DO?')
});
(['SIGINT', 'SIGTERM', 'SIGHUP', 'close']).forEach((eventName) => {
process.once(eventName, cleanExit)
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment