Skip to content

Instantly share code, notes, and snippets.

@DIY0R
Created September 13, 2024 10:20
Show Gist options
  • Save DIY0R/dce3836912bb08a04ff9f2a5c665b3ee to your computer and use it in GitHub Desktop.
Save DIY0R/dce3836912bb08a04ff9f2a5c665b3ee to your computer and use it in GitHub Desktop.
Cancelling streams with AbortController
import fs from "fs";
import { addAbortSignal } from "stream";
import { setTimeout as delay } from "timers/promises";
const controller = new AbortController();
setTimeout(() => controller.abort(), 50);
const inputStream = addAbortSignal(
controller.signal,
fs.createReadStream("text.txt")
);
const outputStream = fs.createWriteStream("output.txt");
async function process(chunk) {
console.log(`Processing chunk: ${chunk.length} bytes`);
// Simulating some async processing
await delay(10);
return chunk;
}
(async () => {
try {
for await (const chunk of inputStream) {
const processedChunk = await process(chunk);
if (!outputStream.write(processedChunk)) {
// Handle backpressure
await new Promise((resolve) => outputStream.once("drain", resolve));
}
}
console.log("Stream processing completed");
} catch (e) {
if (e.name === "AbortError") {
console.log("The operation was cancelled");
} else {
console.error("An error occurred:", e);
throw e;
}
} finally {
outputStream.end();
await new Promise((resolve) => outputStream.once("finish", resolve));
console.log("Output stream closed");
}
})();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment