Created
August 13, 2025 18:06
-
-
Save fell-lucas/64215dd77a3c6aa0477ea0e0aa65bf9f to your computer and use it in GitHub Desktop.
Node.js stream backpressure example/demo
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Node.js backpressure demo | |
// Run: node demo.js | |
const readline = require('readline'); | |
const { Writable } = require('stream'); | |
// Tweak these to make the effect more or less visible | |
const TOTAL = 60; // total chunks to produce | |
const DELAY_MS = 300; // consumer processing time per chunk (slower = more backpressure) | |
const HWM = 12; // writable highWaterMark (buffer size in objectMode) | |
// Counters and state | |
let produced = 0; | |
let consumed = 0; | |
let finished = false; | |
// Terminal helpers | |
let refreshHandle = null; | |
const SPINNER = process.platform === 'win32' | |
? ['-', '\\', '|', '/'] | |
: ['⠋','⠙','⠹','⠸','⠼','⠴','⠦','⠧','⠇','⠏']; | |
let frame = 0; | |
const stdout = process.stdout; | |
const COLORS = { | |
reset: "\x1b[0m", | |
dim: "\x1b[2m", | |
green: "\x1b[32m", | |
yellow: "\x1b[33m", | |
blue: "\x1b[34m", | |
cyan: "\x1b[36m", | |
}; | |
function bar(current, total, width) { | |
const ratio = total === 0 ? 0 : Math.min(1, current / total); | |
const filled = Math.max(0, Math.min(width, Math.round(ratio * width))); | |
const empty = Math.max(0, width - filled); | |
return `${"█".repeat(filled)}${"░".repeat(empty)} ${String(Math.round(ratio * 100)).padStart(3)}%`; | |
} | |
function bufferBar(length, hwm) { | |
const filled = Math.max(0, Math.min(hwm, length)); | |
return `[${"■".repeat(filled)}${"·".repeat(Math.max(0, hwm - filled))}] ${String(length).padStart(2)}/${hwm}`; | |
} | |
function clearAndHideCursor() { | |
stdout.write("\x1b[?25l"); // hide cursor | |
} | |
function showCursor() { | |
stdout.write("\x1b[?25h"); | |
} | |
function startTicker(interval = 120) { | |
if (refreshHandle) return; | |
refreshHandle = setInterval(() => { | |
if (!finished) render(); | |
}, interval); | |
} | |
function stopTicker() { | |
if (refreshHandle) { | |
clearInterval(refreshHandle); | |
refreshHandle = null; | |
} | |
} | |
let linesPrinted = 0; | |
function render() { | |
const width = Math.max(20, Math.min(40, (stdout.columns || 80) - 30)); | |
const spin = SPINNER[(frame = (frame + 1) % SPINNER.length)]; | |
const status = finished | |
? `${COLORS.green}DONE ${COLORS.reset}` | |
: `${COLORS.cyan}RUNNING ${spin}${COLORS.reset}` | |
const header = `${COLORS.blue}Node.js backpressure demo${COLORS.reset}`; | |
const sub = `${COLORS.dim}(slow consumer forces producer to pause when internal buffer is full)${COLORS.reset}`; | |
const lines = [ | |
"", | |
header, | |
sub, | |
"", | |
`Producer ${bar(produced, TOTAL, width)}`, | |
`Consumer ${bar(consumed, TOTAL, width)}`, | |
`Consumer Buffer ${bufferBar(sink.writableLength, sink.writableHighWaterMark)}`, | |
"", | |
`Status ${status}`, | |
`${COLORS.dim}Tip: backpressure = slow consumers signal producers to pause/slow down.${COLORS.reset}`, | |
"", | |
"" | |
]; | |
if (linesPrinted > 0) { | |
readline.moveCursor(stdout, 0, -linesPrinted); | |
} | |
stdout.write(lines.join("\n") + "\n"); | |
linesPrinted = lines.length; | |
} | |
// Actual demo code starts here | |
class SlowWritable extends Writable { | |
constructor(onConsumed) { | |
super({ objectMode: true, highWaterMark: HWM }); | |
this.onConsumed = onConsumed; | |
} | |
_write(_chunk, _enc, cb) { | |
setTimeout(() => { | |
this.onConsumed(); | |
cb(); | |
}, DELAY_MS); | |
} | |
} | |
const sink = new SlowWritable(() => { | |
consumed++; | |
render(); | |
}); | |
function produce() { | |
while (produced < TOTAL) { | |
const chunk = produced + 1; // value unused by sink, kept for realism | |
const canWrite = sink.write(chunk); | |
produced++; | |
if (!canWrite) { | |
render(); | |
sink.once('drain', () => { | |
render(); | |
produce(); | |
}); | |
return; // pause production until 'drain' | |
} | |
} | |
sink.end(); | |
} | |
sink.on('finish', () => { | |
finished = true; | |
render(); | |
cleanup(); | |
}); | |
function cleanup() { | |
stopTicker(); | |
setTimeout(() => { | |
showCursor(); | |
}, 200); | |
} | |
process.on('SIGINT', () => { | |
finished = true; | |
render(); | |
cleanup(); | |
process.exit(); | |
}); | |
clearAndHideCursor(); | |
startTicker(100); | |
render(); | |
produce(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment