Skip to content

Instantly share code, notes, and snippets.

@fell-lucas
Created August 13, 2025 18:06
Show Gist options
  • Save fell-lucas/64215dd77a3c6aa0477ea0e0aa65bf9f to your computer and use it in GitHub Desktop.
Save fell-lucas/64215dd77a3c6aa0477ea0e0aa65bf9f to your computer and use it in GitHub Desktop.
Node.js stream backpressure example/demo
// Node.js backpressure demo
// Run: node demo.js
const readline = require('readline');
const { Writable } = require('stream');
// Tweak these to make the effect more or less visible
const TOTAL = 60; // total chunks to produce
const DELAY_MS = 300; // consumer processing time per chunk (slower = more backpressure)
const HWM = 12; // writable highWaterMark (buffer size in objectMode)
// Counters and state
let produced = 0;
let consumed = 0;
let finished = false;
// Terminal helpers
let refreshHandle = null;
const SPINNER = process.platform === 'win32'
? ['-', '\\', '|', '/']
: ['⠋','⠙','⠹','⠸','⠼','⠴','⠦','⠧','⠇','⠏'];
let frame = 0;
const stdout = process.stdout;
const COLORS = {
reset: "\x1b[0m",
dim: "\x1b[2m",
green: "\x1b[32m",
yellow: "\x1b[33m",
blue: "\x1b[34m",
cyan: "\x1b[36m",
};
function bar(current, total, width) {
const ratio = total === 0 ? 0 : Math.min(1, current / total);
const filled = Math.max(0, Math.min(width, Math.round(ratio * width)));
const empty = Math.max(0, width - filled);
return `${"█".repeat(filled)}${"░".repeat(empty)} ${String(Math.round(ratio * 100)).padStart(3)}%`;
}
function bufferBar(length, hwm) {
const filled = Math.max(0, Math.min(hwm, length));
return `[${"■".repeat(filled)}${"·".repeat(Math.max(0, hwm - filled))}] ${String(length).padStart(2)}/${hwm}`;
}
function clearAndHideCursor() {
stdout.write("\x1b[?25l"); // hide cursor
}
function showCursor() {
stdout.write("\x1b[?25h");
}
function startTicker(interval = 120) {
if (refreshHandle) return;
refreshHandle = setInterval(() => {
if (!finished) render();
}, interval);
}
function stopTicker() {
if (refreshHandle) {
clearInterval(refreshHandle);
refreshHandle = null;
}
}
let linesPrinted = 0;
function render() {
const width = Math.max(20, Math.min(40, (stdout.columns || 80) - 30));
const spin = SPINNER[(frame = (frame + 1) % SPINNER.length)];
const status = finished
? `${COLORS.green}DONE ${COLORS.reset}`
: `${COLORS.cyan}RUNNING ${spin}${COLORS.reset}`
const header = `${COLORS.blue}Node.js backpressure demo${COLORS.reset}`;
const sub = `${COLORS.dim}(slow consumer forces producer to pause when internal buffer is full)${COLORS.reset}`;
const lines = [
"",
header,
sub,
"",
`Producer ${bar(produced, TOTAL, width)}`,
`Consumer ${bar(consumed, TOTAL, width)}`,
`Consumer Buffer ${bufferBar(sink.writableLength, sink.writableHighWaterMark)}`,
"",
`Status ${status}`,
`${COLORS.dim}Tip: backpressure = slow consumers signal producers to pause/slow down.${COLORS.reset}`,
"",
""
];
if (linesPrinted > 0) {
readline.moveCursor(stdout, 0, -linesPrinted);
}
stdout.write(lines.join("\n") + "\n");
linesPrinted = lines.length;
}
// Actual demo code starts here
class SlowWritable extends Writable {
constructor(onConsumed) {
super({ objectMode: true, highWaterMark: HWM });
this.onConsumed = onConsumed;
}
_write(_chunk, _enc, cb) {
setTimeout(() => {
this.onConsumed();
cb();
}, DELAY_MS);
}
}
const sink = new SlowWritable(() => {
consumed++;
render();
});
function produce() {
while (produced < TOTAL) {
const chunk = produced + 1; // value unused by sink, kept for realism
const canWrite = sink.write(chunk);
produced++;
if (!canWrite) {
render();
sink.once('drain', () => {
render();
produce();
});
return; // pause production until 'drain'
}
}
sink.end();
}
sink.on('finish', () => {
finished = true;
render();
cleanup();
});
function cleanup() {
stopTicker();
setTimeout(() => {
showCursor();
}, 200);
}
process.on('SIGINT', () => {
finished = true;
render();
cleanup();
process.exit();
});
clearAndHideCursor();
startTicker(100);
render();
produce();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment