Last active
November 1, 2022 04:43
-
-
Save bellbind/f6a7ba88e9f1a9d749fec4c9289163ac to your computer and use it in GitHub Desktop.
[ES2017][WHATWG-Streams] Basic examples for WHATWG Streams API (on nodejs)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// (default) object stream | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
(async () => { | |
// pulling to use the `Reader` of `ReadableStream` | |
const r1 = rs1.getReader(); | |
console.log("[1]", await r1.read()); //=> {value: "a", done: false} | |
console.log("[2]", await r1.read()); //=> {value: "b", done: false} | |
console.log("[3]", await r1.read()); //=> {value: "c", done: false} | |
console.log("[4]", await r1.read()); //=> {value: "d", done: false} | |
console.log("[5]", await r1.read()); //=> {value: undefined, done: true} | |
/* | |
// or as in loop await read() | |
for (let n = await r1.read(); !n.done; n = await r1.read()) { | |
console.log("[value]", n.value); | |
} | |
//*/ | |
})().catch(console.error); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// as byte array specialized stream | |
const rs1 = new ReadableStream({ | |
type: "bytes", //IMPORTANT | |
async start(controller) { | |
console.log("[start]"); | |
//IMPORTANT: limits enqueue() with some ArrayBufferView | |
controller.enqueue(new Uint32Array([1])); | |
controller.enqueue(new Uint32Array([2])); | |
controller.enqueue(new Uint32Array([3])); | |
}, | |
async pull(controller) { | |
console.log("[pull]"); | |
controller.enqueue(new Uint32Array([4])); | |
controller.close(); | |
}, | |
async cancel(reason) { | |
console.log("[cancel]", reason); | |
}, | |
}); | |
(async () => { | |
// pulling to use the `Reader` of `ReadableStream` | |
const r1 = rs1.getReader(); | |
// read value as Uint8Array regardless of enqueue-ing ArrayBufferView class | |
console.log("[1]", await r1.read()); //=> {value: u8[1,0,0,0], done: false} | |
console.log("[2]", await r1.read()); //=> {value: u8[2,0,0,0], done: false} | |
console.log("[3]", await r1.read()); //=> {value: u8[3,0,0,0], done: false} | |
console.log("[4]", await r1.read()); //=> {value: u8[4,0,0,0], done: false} | |
console.log("[5]", await r1.read()); //=> {value: undefined, done: true} | |
/* | |
// or as in loop await read() | |
for (let n = await r1.read(); !n.done; n = await r1.read()) { | |
console.log("[value]", n.value); | |
} | |
//*/ | |
})().catch(console.error); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// writable stream | |
//NOTE: writable has no byte-array specific implementations | |
const ws1 = new WritableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
// to do some (asynchronous) initialization | |
}, | |
async write(chunk, controller) { | |
// called when writer.write() | |
console.log("[write]", chunk); | |
}, | |
async close(controller) { | |
console.log("[close]"); | |
}, | |
async abort(reason) { | |
// called when ws.abort(reason) | |
console.log("[abort]", reason); | |
} | |
}); | |
(async () => { | |
console.log(ws1.getWriter); | |
const w1 = ws1.getWriter(); | |
//NOTE: write and close may spawn error | |
for (const v of ["a", "b", "c"]) { | |
await w1.write(v); | |
} | |
await w1.close(); | |
})().catch(console.error); | |
// NOTE on difference between `close()` and `abort()` | |
// - close() is sequentially called after former write()s are finished | |
// - abort() is quickly called even if former write() is not yet finished |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
const {TextEncoder} = require("util"); | |
//NOTE: TransformStream is not yet in current standard spec | |
// transform stream | |
const ts1 = new TransformStream({ | |
async transform(chunk, controller) { | |
console.log("[transform]", chunk); | |
controller.enqueue(new TextEncoder().encode(chunk)); | |
}, | |
async flush(controller) { | |
console.log("[flush]"); | |
controller.close(); | |
}, | |
}); | |
(async () => { | |
const rs = ts1.readable; | |
const ws = ts1.writable; | |
//NOTE: no await because the returned promise is resolved after read() | |
const w = ws.getWriter(); | |
for (const ch of "abc") { | |
w.write(ch); | |
} | |
w.close(); | |
const r = rs.getReader(); | |
for (let n = await r.read(); !n.done; n = await r.read()) { | |
console.log("[value]", n.value); | |
} | |
})().catch(console.error); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
const {TextEncoder} = require("util"); | |
// NOTE: TransformStream is not yet in standard, but | |
// the {readable, writable} object can be used as TransformStream in anywhere | |
// simple implementation for transform stream style readable/writable pair | |
function transformPair(transformer) { | |
let rcontroller = null, wcontroller = null; | |
const readable = new ReadableStream({ | |
type: transformer.type, | |
async start(controller) { | |
rcontroller = controller; | |
}, | |
async cancel(reason) { | |
wcontroller.error(reason); | |
}, | |
}); | |
const writable = new WritableStream({ | |
async start(controller) { | |
wcontroller = controller; | |
}, | |
async write(chunk, controller) { | |
await transformer.transform(chunk, rcontroller); | |
}, | |
async close(controller) { | |
await transformer.flush(rcontroller); | |
await rcontroller.close(); | |
}, | |
async abort(reason) { | |
await rcontroller.error(reason); | |
}, | |
}); | |
return {readable, writable}; | |
} | |
const ts1 = transformPair({ | |
type: "bytes", | |
async transform(chunk, controller) { | |
console.log("[transform]", chunk); | |
controller.enqueue(new TextEncoder().encode(chunk)); | |
}, | |
async flush(controller) { | |
console.log("[flush]"); | |
}, | |
}); | |
(async () => { | |
const rs = ts1.readable; | |
const ws = ts1.writable; | |
const w = ws.getWriter(); | |
for (const ch of "abc") { | |
await w.write(ch); | |
} | |
await w.close(); | |
const r = rs.getReader(); | |
for (let n = await r.read(); !n.done; n = await r.read()) { | |
console.log("[value]", n.value); | |
} | |
})().catch(console.error); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// (default) object stream | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
(async () => { | |
// rs.tree() duplicate two ReadableStreams | |
const [rsA, rsB] = rs1.tee(); | |
const r1 = rsA.getReader(); | |
console.log("[A 1]", await r1.read()); //=> {value: "a", done: false} | |
console.log("[A 2]", await r1.read()); //=> {value: "b", done: false} | |
console.log("[A 3]", await r1.read()); //=> {value: "c", done: false} | |
console.log("[A 4]", await r1.read()); //=> {value: "d", done: false} | |
console.log("[A 5]", await r1.read()); //=> {value: undefined, done: true} | |
// or as in loop await read() | |
const r2 = rsB.getReader(); | |
for (let n = await r2.read(); !n.done; n = await r2.read()) { | |
console.log("[B value]", n.value); | |
} | |
})().catch(console.error); | |
// NOTE on rsA and rsB cancel() | |
// - rs canceled after both rsA and rsB are cancelled |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// (default) object stream | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start readable]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
// writable stream | |
const ws1 = new WritableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start writable]"); | |
}, | |
async write(chunk, controller) { | |
// called when writer.write() | |
console.log("[write]", chunk); | |
}, | |
async close(controller) { | |
console.log("[close]"); | |
}, | |
async abort(reason) { | |
console.log("[abort]", reason); | |
} | |
}); | |
(async () => { | |
// rs.pipeTo(ws): readable sends to writable | |
await rs1.pipeTo(ws1); //NOTE: returned Promise waits writable closed | |
console.log("[finished]"); | |
})().catch(console.error); | |
// NOTE on rs.pipeTo(ws, opts) | |
// - {preventClose = false, preventAbort = false, preventCancel = false} = opts | |
// These prevents are just ignoring to source/sink callbacks at the situations: | |
// - rsc.close() => ws.close() | |
// - rsc.error() => ws.abort() | |
// - wsc.error() => rs.cancel() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
const {TextEncoder} = require("util"); | |
//NOTE: TransformSTream is not yet in standard spec | |
// transform stream | |
const ts1 = new TransformStream({ | |
async transform(chunk, controller) { | |
console.log("[transform]", chunk); | |
controller.enqueue(new TextEncoder().encode(chunk)); | |
}, | |
async flush(controller) { | |
console.log("[flush]"); | |
controller.close(); | |
}, | |
}); | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
(async () => { | |
// rs.pipeThrough(ts): rs send to ts.writable | |
// - returns ts.readable | |
const r = rs1.pipeThrough(ts1).getReader(); | |
for (let n = await r.read(); !n.done; n = await r.read()) { | |
console.log("[value]", n.value); | |
} | |
})().catch(console.error); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// writable stream | |
const ws1 = new WritableStream({ | |
async start(controller) { | |
console.log("[start]"); | |
}, | |
async write(chunk, controller) { | |
console.log("[write]", chunk); | |
await new Promise(r => setTimeout(r, 1000)); // wait to next write | |
}, | |
async close(controller) { | |
console.log("[close]"); | |
}, | |
async abort(reason) { | |
console.log("[abort]", reason); | |
} | |
}, {highWaterMark: 2}); | |
//NOTE: highWaterMark: set queueing count to wait writing (default 1) | |
//- not a queue size limit, but a count to start waiting as ready | |
(async () => { | |
const w1 = ws1.getWriter(); | |
const start = Date.now(); | |
for (const v of "abcd") { | |
//* | |
await w1.ready; //IMPORTANT: wait to add write queue | |
console.log("[ready]", Date.now() - start, "msec"); | |
w1.write(v); // the Promise resolved after finish to write | |
//*/ | |
// compare: the difference with awaiting ready and write as: | |
/* | |
w1.ready; //IMPORTANT: wait to add write queue | |
console.log("[ready]", Date.now() - start, "msec"); | |
await w1.write(v); // the Promise resolved after finish to write | |
//*/ | |
} | |
await w1.close(); | |
})().catch(console.error); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// example of cancel() | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
console.log("[start]"); | |
}, | |
async pull(controller) { | |
console.log("[read]"); | |
await new Promise((res, rej) => {}); // pending forever | |
}, | |
async cancel(reason) { | |
console.log("[cancel]", reason); | |
}, | |
}); | |
const r1 = rs1.getReader(); | |
// read await until cancel | |
(async () => { | |
console.log(await r1.read()); //=> {value: undefined, done: true} | |
})().catch(console.error); | |
// cancel after 500msec | |
(async () => { | |
await new Promise(r => setTimeout(r, 500)); | |
r1.cancel("timeout"); | |
})().catch(console.error); | |
//NOTE on `Reader` and lock | |
//- `r = rs.getReader()` create a new reader object that has a lock | |
// - next `r2 = rs.getReader()` spawn TypeError because the rs is locked | |
//- `r.releaseLock()` release its lock | |
// - it can get a new reader by `r2 = rs.getReader()` | |
//- single reader object can do `r.read()`/`r.cancel()`, it limited by the lock | |
// - `rs.cancel()` is also locked by a reader objecct |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//NOTE: (node >=8.8.0): use the option `node --harmony_async_iteration` to run | |
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// (default) object stream | |
const rs1 = new ReadableStream({ | |
async start(controller) { | |
// called by constructor | |
console.log("[start]"); | |
controller.enqueue("a"); | |
controller.enqueue("b"); | |
controller.enqueue("c"); | |
}, | |
async pull(controller) { | |
// called read when controller's queue is empty | |
console.log("[pull]"); | |
controller.enqueue("d"); | |
controller.close(); // or controller.error(); | |
}, | |
async cancel(reason) { | |
// called when rs.cancel(reason) | |
console.log("[cancel]", reason); | |
}, | |
}); | |
// Async Iterator Proposal: async generator function and for-await-of loop | |
//- https://github.com/tc39/proposal-async-iteration | |
// `AsyncGeneratorFunction` syntax | |
async function* agen1(nextP) { | |
for (let n = await nextP(); !n.done; n = await nextP()) yield n.value; | |
} | |
// - `async function*` returns an object with the `Symbol.aysncIterator` method | |
// - The `Symbol.asyncIterator` method returns an object with `next` method | |
// - The `next` method returns a `Promise` of `{value, done}` | |
// - which has same means with basic ES6 iterator's `{value, done}` | |
function agen2(nextP) { | |
// emulate with basic function as above `async function*` code | |
return { | |
[Symbol.asyncIterator]() {return {next: nextP};} | |
}; | |
} | |
(async () => { | |
// pulling to use the `Reader` of `ReadableStream` | |
const r1 = rs1.getReader(); | |
// for-await-of loop syntax: | |
//* case 1: builtin AsyncGeneratorFunction | |
for await (const v of agen1(_ => r1.read())) console.log(v); | |
//*/ | |
/* case 2: asyncIterator object retured basic function | |
for await (const v of agen2(_ => r1.read())) console.log(v); | |
//*/ | |
/* case 3: inlined asyncIterator object | |
for await (const v of {[Symbol.asyncIterator]() {return {next() { | |
return r1.read(); | |
}};}}) console.log(v); | |
//*/ | |
})().catch(console.error); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// npm i web-streams-polyfill | |
const streams = require("web-streams-polyfill"); | |
const {ReadableStream, WritableStream, TransformStream} = streams; | |
// source for object streams | |
class Producer { | |
constructor(name, max = 10, count = 0) { | |
this.name = name; | |
this.max = max; | |
this.count = count; | |
} | |
async start(controller) { | |
console.log(`[start ${this.name}]`); | |
} | |
async pull(controller) { | |
console.log(`[pull ${this.name}]: ${++this.count}`); | |
controller.enqueue(`${this.name} ${this.count}`); | |
if (this.count === this.max) controller.close(); | |
} | |
async cancel(reason) { | |
console.log(`[cancel ${this.name}]`, reason); | |
} | |
} | |
// sink for object stream | |
class Consumer { | |
constructor(name, max = 100, count = 0) { | |
this.name = name; | |
this.max = max; | |
this.count = count; | |
} | |
async start(controller) { | |
console.log(`[start ${this.name}]`); | |
} | |
async write(chunk, controller) { | |
if (this.count === this.max) { | |
controller.error(`[reach max] ${this.max}`); | |
} else { | |
console.log(`[write ${this.name} ${++this.count}]: ${chunk}`); | |
} | |
} | |
async close(controller) { | |
console.log(`[close ${this.name}]`); | |
} | |
async abort(reason) { | |
console.log(`[abort ${this.name}]: ${reason}`); | |
} | |
} | |
// non-locked pipeTo `Writer` for parallel read-to-write from multi RS | |
function pipeToWriter( | |
rs, w, {preventClose, preventCancel, preventAbort} = {}) { | |
let wactive = true; | |
const r = rs.getReader(); | |
const rclosed = r.closed.then(value => { | |
if (!preventClose && wactive) { | |
wactive = false; | |
return w.close().then(_ => value); | |
} | |
return value; | |
}, error => { | |
if (!preventClose && wactive) { | |
wactive = false; | |
return w.close().then(_ => {throw error;}); | |
} | |
throw error; | |
}); | |
const wclosed = w.closed.then(value => { | |
wactive = false; | |
if (!preventCancel) return r.cancel(). | |
then(_ => r.releaseLock()).then(_ => value); | |
return r.releaseLock().then(_ => value); | |
}, reason => { | |
wactive = false; | |
if (!preventCancel) return r.cancel(reason). | |
then(_ => r.releaseLock()).then(_ => {throw reason;}); | |
return r.reseaseLock().then(_ => {throw reason;}); | |
}); | |
function onReadError(error) { | |
if (!preventAbort) w.abort(error); | |
return {value: undefined, done: true}; | |
} | |
async function loop() { | |
while (true) { | |
await w.ready; | |
const {value, done} = await r.read().catch(onReadError); | |
if (done) break; | |
await w.write(value); | |
} | |
} | |
return Promise.race([loop().catch(err => {}), rclosed, wclosed]); | |
} | |
(async () => { | |
//* case 1: sequential two stream with pipeTo() | |
console.log("X: A1, A2, A3, A4, A5, A6, A7, A8, B1, B2"); | |
const rsA = new ReadableStream(new Producer("A", 8)); | |
const rsB = new ReadableStream(new Producer("B", 8)); | |
const wsX = new WritableStream(new Consumer("X", 10)); | |
// pipeTo locks the writer, so next pipeTo required to await to finish | |
await rsA.pipeTo(wsX, {preventClose: true}); | |
//NOTE: pipeTo fails when writor error | |
await rsB.pipeTo(wsX).catch(err => {}); // write until "B 2" | |
console.log(); | |
//*/ | |
//* case 2: parallel | |
console.log("Y: C1, D1, C2, D2, C3, D3, C4, D4, C5, D5"); | |
const rsC = new ReadableStream(new Producer("C", 8)); | |
const rsD = new ReadableStream(new Producer("D", 8)); | |
const wsY = new WritableStream(new Consumer("Y", 10)); | |
const wY = wsY.getWriter(); | |
await Promise.all([pipeToWriter(rsC, wY), pipeToWriter(rsD, wY)]).catch(console.error); | |
console.log(); | |
//*/ | |
//* case 3: merge single RS with TransformStream as queue | |
console.log("Z: E1, F1, E2, F2, E3, F3, E4, F4, E5, F5"); | |
const rsE = new ReadableStream(new Producer("E", 8)); | |
const rsF = new ReadableStream(new Producer("F", 8)); | |
const ts = new TransformStream(); | |
const tsw = ts.writable.getWriter(); | |
pipeToWriter(rsE, tsw, {preventClose: true}); | |
pipeToWriter(rsF, tsw, {preventClose: true}); | |
const wsZ = new WritableStream(new Consumer("Z", 10)); | |
await ts.readable.pipeTo(wsZ).catch(err => {}); | |
console.log(); | |
//*/ | |
})().catch(console.error); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
My updated submodules and devDependencies version:
npm i bellbind/web-streams-polyfill
(it takes much long time to pull recursive submodules)