Last active
April 22, 2023 23:31
-
-
Save abdulloooh/d4c881368971b40c35aded364c45513d to your computer and use it in GitHub Desktop.
Advanced Node.js Streams
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// BUFFER | |
// Load the whole content into a buffer/ into memory once before wrting it out to user | |
const http = require("http"); | |
const media = "./testvid.mp4"; | |
const fs = require("fs"); | |
http | |
.createServer((req, res) => { | |
fs.readFile(media, (err, data) => { | |
if (err) console.log({ err }); | |
res.writeHeader(200, { "Content-Type": "video/mp4" }); | |
res.end(data); | |
}); | |
}) | |
.listen(3000, () => { | |
console.log("buffer - port 3000"); | |
}); | |
// STREAM | |
// read the content chunk by chunk | |
const http = require("http"); | |
const fs = require("fs"); | |
const media = "./testvid.mp4"; | |
http | |
.createServer((req, res) => { | |
res.writeHeader(200, { "Content-Type": "video/mp4" }); | |
fs.createReadStream(media).pipe(res).on("error", console.log); | |
}) | |
.listen(3000, () => console.log("stream - port 3000")); | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// CREATING readable stream | |
const { stdout } = require("process"); | |
const { Readable } = require("stream"); | |
// convert peak into stream | |
const peaks = ["adee", "iee", "yakudh", "sulop", "potilo", "wertu"]; | |
class StreamFromArray extends Readable { | |
constructor(array) { | |
//Readable may (not) take argument super(), super({encoding:"utf-8"}), super({objectMode:true}) etc | |
// super({ encoding: "utf-8" }); | |
super({ objectMode: true }); | |
this.array = array; | |
this.index = 0; | |
} | |
_read() { | |
if (this.index < this.array.length) { | |
this.push(this.array[this.index]); | |
// object mode handles if object is being pushed | |
// this.push({ index: this.index, chunk: this.array[this.index] }); | |
this.index++; | |
} else this.push(null); | |
} | |
} | |
const peakStrem = new StreamFromArray(peaks); | |
/** | |
* _Events_ | |
* data, error, close, end, pause, readable, resume | |
*/ | |
peakStrem.on("data", (chunk) => console.log("\nchunk ", chunk, "\n")); | |
// can check size by reading length of buffer, not sure of this | |
peakStream.on("data",(chunk)=>console.log(chunk.length)) | |
// | |
peakStrem.on("end", () => console.log("DONE!")); | |
peakStrem.pipe(stdout); //cos stdout is writable | |
// USING exisiting READABLE STREAMS | |
const fs = require("fs"); | |
const readStream = fs.createReadStream( | |
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4" | |
); | |
// FLOWING MODE | |
readStream.on("data", console.log); | |
readStream.on("end", () => console.log("Done!")); | |
readStream.on("error", console.log); | |
// NON-FLOWING MODE | |
readStream.pause(); | |
// stdin is a readable stream, can read from its client | |
// process.stdin.on("data", (chunk) => { | |
// console.log("echo: ", chunk.toString()); | |
// }); | |
// use stdin to control readStream | |
process.stdin.on("data", (chunk) => { | |
if (chunk.toString().trim() !== "finish") return readStream.read(); | |
// RETURN TO FLOWING MODE | |
readStream.resume(); | |
}); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Writable streams can be used to catch data from a readable stream and send it out | |
* They are ALSO everywhere; fs,stdout, npm modules etc | |
*/ | |
const { createReadStream, createWriteStream } = require("fs"); | |
const readStream = createReadStream( | |
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4" | |
); | |
const writeStream = createWriteStream("./copy1.mp4"); | |
readStream.on("data", (chunk) => { | |
writeStream.write(chunk); | |
}); | |
readStream.on("error", console.log); | |
readStream.on("end", () => { | |
writeStream.end(); | |
}); | |
writeStream.on("close", () => { | |
process.stdout.write("File copied\n"); //stdout is also a writeStream | |
}); | |
writeStream.on("error", console.log); | |
/** | |
* DIFFERENCE BETWEEN stream data and pipe | |
* Roughly same, Pipe and Unpipe and instances of Event-emitter | |
BUT from https://stackoverflow.com/questions/53774176/difference-between-pipe-and-stream-in-node-js | |
You should use the pipe method because the flow of data will be automatically managed so that | |
the destination Writable stream is not overwhelmed by a faster Readable stream. | |
If your readable stream is faster than the writable stream then you may experience data loss in dest.write(data) method | |
so better you should use src.pipe(des); | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Handling Backpressure | |
* | |
* 2 ways, not essentially mutually exclusive | |
* | |
* 1. Pause and Drain | |
* 2. Set big highWaterMark | |
* | |
* highWaterMark is like using a massive hose in piping water from one bucket to another illustration, | |
* the bigger the hose, the lesser the backpressure. A usual hose might however causes backpressure | |
* if water is poured very fast from src, so need for pausing the pour and draining the hose | |
* | |
* but the larget the highWaterMark, the more memory to consume | |
*/ | |
const { createReadStream, createWriteStream } = require("fs"); | |
const readStream = createReadStream( | |
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4" | |
); | |
// const writeStream = createWriteStream("./copy1.mp4"); | |
const writeStream = createWriteStream("./copy1.mp4", { | |
highWaterMark: 12982818, | |
}); | |
readStream.on("data", (chunk) => { | |
const canWriteMore = writeStream.write(chunk); | |
if (!canWriteMore) { | |
console.log("backpressure"); | |
readStream.pause(); | |
} | |
}); | |
readStream.on("error", console.log); | |
readStream.on("end", () => { | |
writeStream.end(); | |
}); | |
writeStream.on("drain", () => { | |
console.log("drained"); | |
readStream.resume(); | |
}); | |
writeStream.on("close", () => { | |
process.stdout.write("File copied\n"); //stdout is also a writeStream | |
}); | |
writeStream.on("error", console.log); |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* With pipe, it handles all the backpressure, draining and resuming etc with lesser code | |
* pipe function can pass data from any readableStream to any writableStream | |
*/ | |
const { createReadStream, createWriteStream } = require("fs"); | |
const readStream = createReadStream( | |
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4" | |
); | |
// const writeStream = createWriteStream("./copy1.mp4"); | |
const writeStream = createWriteStream("./copy1.mp4"); | |
readStream | |
.pipe(writeStream) | |
.on("close", () => console.log("File Copied!")) | |
.on("error", console.error); | |
// let's write text | |
const textStream = createWriteStream("./file1.txt"); | |
process.stdin.pipe(textStream); | |
/** | |
* We can write into textStream from console in several ways | |
* | |
* 1. Allow readStream to open and enter your data, then it is auto piped into textStream | |
* 2. echo on console and use `unix` pipe into our node process e.g echo "hello world" | node . | |
* 3. Read from a file on console and unix pipe it into node program e.g cat ../sample.txt | node . | |
* | |
* Basically, data is passed into node program from all the 3 above, get into the readable stdin | |
* then piped into writable textStream | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* DUPLEX STREAM | |
* | |
* readStream can pipe into it, and it can pipe into writeStream | |
* It is a middle section. | |
* Think of it as a middleware that can be utilized to do some in-processing e.g | |
* - Report on data being sent, like total size moved | |
* - Throttle data passage i.e slow down | |
*/ | |
const { Duplex, PassThrough } = require("stream"); | |
const { createReadStream, createWriteStream } = require("fs"); | |
const readStream = createReadStream( | |
"./Ex_Files_Advanced_NodeJS/Exercise Files/Ch02/powder-day.mp4" | |
); | |
const writeStream = createWriteStream("./copy1.mp4"); | |
class Throttle extends Duplex { | |
constructor(delay_ms) { | |
super(); | |
this.delay = delay_ms; | |
} | |
_write(chunk, encoding, callback) { | |
this.push(chunk); | |
setTimeout(callback, this.delay); | |
} | |
_read(chunk) {} | |
_final() { | |
this.push(null); | |
} | |
} | |
const report = new PassThrough(); | |
const throttle = new Throttle(100); | |
readStream.pipe(throttle).pipe(report).pipe(writeStream); | |
let totalSize = 0; | |
report.on("data", (chunk) => { | |
totalSize += chunk.length; | |
console.log(`_Total moved_: ${totalSize} bytes`); | |
}); | |
/** | |
* TRANSFORM STREAM | |
* | |
* Transform Stream is a form of Duplex stream likewise that sits as middle line | |
* Basically used to transform data from readStream before passing to writeStream e.g | |
* replace some characters | |
* encrypt/decrypt | |
* compress etc | |
*/ | |
const { stdin, stdout } = require("process"); | |
const { Transform } = require("stream"); | |
class ReplaceText extends Transform { | |
constructor(char) { | |
super(); | |
this.replaceChar = char; | |
} | |
_transform(chunk, encoding, callback) { | |
this.push(chunk.toString().replace(/\w/g, this.replaceChar)); | |
callback(); | |
} | |
_flush(callback) { | |
/**Havent figuredout the use of this */ | |
} | |
} | |
const xTreme = new ReplaceText("x"); | |
stdin.pipe(xTreme).pipe(stdout); | |
console.log("start streaming data in i.e start typing"); | |
/** | |
Crypto is an NPM package that has transform streams that can encrypt data chunk by chunk, | |
and then pass encrypted data to the write stream. | |
Or decrypt data from a read stream, | |
and pass the decrypted data to the write stream. | |
So transform streams are an essential part of the stream family. | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { createServer } = require("http"); | |
const { createReadStream, stat } = require("fs"); | |
const { promisify } = require("util"); | |
const fileInfo = promisify(stat); | |
const file = "./Ex_Files_Advanced_NodeJS/Exercise Files/Ch03/powder-day.mp4"; | |
createServer(async (req, res) => { | |
const { size } = await fileInfo(file); | |
const range = req.headers.range; | |
if (range) { | |
console.log(range); | |
let [start, end] = range.replace(/bytes=/, "").split("-"); | |
start = parseInt(start, 10); | |
end = end ? parseInt(end, 10) : size - 1; | |
res.writeHead(206, { | |
"Content-Range": `bytes ${start}-${end}/${size}`, | |
"Accept-Ranges": "bytes", | |
"Content-Length": end + 1 - start, | |
"Content-Type": "video/mp4", | |
}); | |
createReadStream(file, { start, end }).pipe(res); | |
} else { | |
res.writeHead(200, { | |
"Content-Length": size, | |
"Content-Type": "video/mp4", | |
}); | |
createReadStream(file).pipe(res); | |
} | |
}).listen(3000, () => console.log("server - port 3000")); | |
/** | |
* implementing `range request` is very important as some broswers e.g | |
* safari might not even load the video if range requests are not handled. | |
* | |
* That aside, client can skip the video forward or backward | |
* | |
* 206 status code implies partial response content to browser | |
* | |
* The Accept-Ranges response HTTP header is a marker used by the server to | |
* advertise its support of partial requests. The value of this field | |
* indicates the unit that can be used to define a range. | |
*/ |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const { createServer } = require("http"); | |
const { createReadStream, stat, createWriteStream } = require("fs"); | |
const { promisify, inspect } = require("util"); | |
const multiparty = require("multiparty"); | |
const fileInfo = promisify(stat); | |
const file = "./Ex_Files_Advanced_NodeJS/Exercise Files/Ch03/powder-day.mp4"; | |
const responseWithVideo = async (req, res) => { | |
const { size } = await fileInfo(file); | |
const range = req.headers.range; | |
if (range) { | |
console.log(range); | |
let [start, end] = range.replace(/bytes=/, "").split("-"); | |
start = parseInt(start, 10); | |
end = end ? parseInt(end, 10) : size - 1; | |
res.writeHead(206, { | |
"Content-Range": `bytes ${start}-${end}/${size}`, | |
"Accept-Ranges": "bytes", | |
"Content-Length": end + 1 - start, | |
"Content-Type": "video/mp4", | |
}); | |
createReadStream(file, { start, end }).pipe(res); | |
} else { | |
res.writeHead(200, { | |
"Content-Length": size, | |
"Content-Type": "video/mp4", | |
}); | |
createReadStream(file).pipe(res); | |
} | |
}; | |
createServer((req, res) => { | |
if (req.method === "POST" && req.url === "/upload") { | |
// CAN JUST PARSE DIRECTLY | |
// req.pipe(res); | |
// req.pipe(process.stdout); | |
// req.pipe(createWriteStream("./upload.file")); | |
// CAN VALIDATE FIRST | |
const form = new multiparty.Form(); | |
// form.parse(req, function (err, fields, files) { | |
// res.writeHead(200, { "content-type": "text/plain" }); | |
// res.write("received upload:\n\n"); | |
// res.end(inspect({ fields: fields, files: JSON.stringify(files) })); | |
// }); | |
form | |
.on("part", (part) => { | |
//for a single file | |
part.pipe(createWriteStream(`./store/single/${part.filename}`)); | |
// for mix of fields and files | |
{ | |
// for fields other than files | |
if (!part.filename) { | |
console.log({ field: part.name }); | |
part.resume(); | |
} else { | |
// if single file is expected or multiple expected to be saved | |
part.pipe(createWriteStream(`./store/${part.filename}`)); | |
//if multiple files are expected but particular one is needed | |
if (part.filename.includes("sample")) | |
part.pipe(createWriteStream(`./store/filter/${part.filename}`)); | |
} | |
} | |
}) | |
.on("close", function () { | |
console.log("Upload completed!"); | |
res.writeHead(200, { "content-type": "text/plain" }); | |
res.end("Received files"); | |
}); | |
form.parse(req); | |
} else if (req.url === "/video") responseWithVideo(req, res); | |
else { | |
res.writeHead(200, { "Content-Type": "text/html" }); | |
res.end(` | |
<form action="/upload" enctype="multipart/form-data" method="post"> | |
<input type="text" name="title"><br> | |
<input type="text" name="author"><br> | |
<input type="file" name="upload" multiple="multiple"><br> | |
<input type="submit" value="Upload"> | |
</form> | |
`); | |
} | |
}).listen(3000, () => console.log("server - port 3000")); | |
/** | |
* implementing `range request` is very important as some broswers e.g | |
* safari might not even load the video if range requests are not handled. | |
* | |
* That aside, client can skip the video forward or backward | |
* | |
* 206 status code implies partial response content to browser | |
* | |
* The Accept-Ranges response HTTP header is a marker used by the server to | |
* advertise its support of partial requests. The value of this field | |
* indicates the unit that can be used to define a range. | |
* | |
* multiparty library can parse all form of requests, fields and files | |
*/ |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment