Skip to content

Instantly share code, notes, and snippets.

@rluvaton
Last active July 4, 2022 14:32
Show Gist options
  • Save rluvaton/2289fcaf34387af62314d65d439ae4d0 to your computer and use it in GitHub Desktop.
Save rluvaton/2289fcaf34387af62314d65d439ae4d0 to your computer and use it in GitHub Desktop.
Stream concurrency sending in order

Output

$ node index.js
[45] map a
[45] map b
[45] map c
[45] filter az
[55] filter bz
[55] filter cz // <- This wait to bz to finish!

[ 'bz', 'cz' ]

If true concurrency:

$ node index.js
[45] map a
[45] map b
[45] map c
[45] filter az
[45] filter cz // <- This doesn't wait for bz to finish
[55] filter bz

[ 'bz', 'cz' ]
import { Readable } from 'stream';
(async () => {
const sleep = (ms) => new Promise(resolve => setTimeout(resolve, ms));
// noinspection ES6RedundantAwait
const data = await Readable.from([{value: 'a', timeout: 10}, {value: 'b', timeout: 10000}, {value: 'c', timeout: 0}])
.map(async ({value, timeout}) => {
console.log(`[${new Date().getSeconds()}] map`, value)
await sleep(timeout);
return value + 'z';
}, { concurrency: 3 })
.filter((value) => {
console.log(`[${new Date().getSeconds()}] filter`, value);
return !value.includes('a');
})
.toArray();
console.log();
console.log(data);
})().catch((e) => console.error(e)).then((data) => console.log(data))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment