Skip to content

Instantly share code, notes, and snippets.

@datfinesoul
Last active February 15, 2021 13:29
Show Gist options
  • Save datfinesoul/50e0a78e9e717927316db6e54607daca to your computer and use it in GitHub Desktop.
Save datfinesoul/50e0a78e9e717927316db6e54607daca to your computer and use it in GitHub Desktop.
Node.js Generator Async/Await Concurrency Limit

elaborate-example.js

Shows the use of generators to control concurrency. This example illustrates the following.

  • 3 workers running through the data
  • Each piece of "work" takes 200-2000 milliseconds to run
  • The 5th, 10th and 17th item will throw an exception
  • Workers will exit because the exception is unhandled
worker='1' itemnum='0' item='F'
worker='2' itemnum='1' item='a'
worker='1' itemnum='3' item='s'
worker='3' itemnum='2' item='l'
worker='2' itemnum='4' item='c'
worker='3' itemnum='6' item='e'
worker='3' itemnum='8' item=' '
worker='2' itemnum='7' item='s'
worker='3' itemnum='9' item='Ü'
worker='3' itemnum='11' item='e'
worker='3' itemnum='12' item='n'
worker='3' itemnum='13' item=' '
worker='3' itemnum='14' item='v'
worker='3' itemnum='15' item='o'
worker='3' itemnum='16' item='n'

simple-example.js

Simpler example than above, with added exception handling.

  • 3 workers running through the data
  • Each piece of "work" takes 200 milliseconds to run
  • The 5th, 10th and 17th item will throw an exception
  • The script will finish all 57 items
itemnum='0' item='F'
...
itemnum='16' item='n'
Error: Error at 17:
    at /home/phadviger/code/demo/gist.generator-concurrency/simple-example.js:12:17
itemnum='19' item='y'
...
itemnum='57' item='g'

module.js

Contains a reusable method called mapC which actually performs a map, instead of just using the map method to loop.

const sleep = t => new Promise(rs => setTimeout(rs, t))
function getRandomIntInclusive(min, max) {
min = Math.ceil(min);
max = Math.floor(max);
//The maximum is inclusive and the minimum is inclusive
return Math.floor(Math.random() * (max - min + 1) + min);
};
const NUMBER_OF_WORKERS = 3;
function* getWorkers(index = 1, limit = NUMBER_OF_WORKERS) {
while (index <= 10) {
yield index++;
}
}
async function doWork(numWorkers = 1, assignments, plan) {
const workload = Array.from(assignments).entries();
const workers = getWorkers();
const work = Array(numWorkers).fill(workload).map(async tasks => {
/*
* NOTE: Exceptions from this scope down, don't trickle up and
* need to be explicitly caught. If an exception occurs inside
* of a worker, the worker will stop.
*/
const id = workers.next().value;
for (let [index, task] of tasks) {
await plan(task, id, index);
}
});
return await Promise.allSettled(work);
}
(async () => {
const data = 'Falsches Üben von Xylophonmusik quält jeden größeren Zwerg';
await doWork(NUMBER_OF_WORKERS, data, async (task, id, index) => {
// wait 200 - 2000 milliseconds
await sleep(getRandomIntInclusive(1, 10) * 200)
if ([5, 10, 17].includes(index)) {
throw new Error('test');
}
console.log(`worker='${id}' itemnum='${index}' item='${task}'`)
});
})();
const mapC = async (data, callback, concurrency = 1) => {
const _data = Array.from(data);
const result = new Array(_data.length);
await Promise.allSettled(
new Array(concurrency).fill(_data.entries())
.map(async iterator => {
for (let [index, item] of iterator) {
result[index] = await callback(item, index);
}
})
);
return result;
};
;(async () => {
const sleep = t => new Promise(rs => setTimeout(rs, t));
const data = 'Falsches Üben von Xylophonmusik quält jeden größeren Zwerg';
const results = await mapC(data, async (item, index) => {
try {
await sleep(200);
if ([5, 10, 17].includes(index)) {
throw new Error(`Error at ${index}: ${item}`);
}
console.log(`itemnum='${index}' item='${item}'`);
return item && '-';
} catch (error) {
console.error(error);
return item;
}
}, 20);
console.log(results);
})();
(async () => {
const sleep = t => new Promise(rs => setTimeout(rs, t));
const NUMBER_OF_WORKERS = 3;
const data = 'Falsches Üben von Xylophonmusik quält jeden größeren Zwerg';
const workers = new Array(NUMBER_OF_WORKERS).fill(Array.from(data).entries())
.map(async iterator => {
for (let [index, item] of iterator) {
try {
await sleep(200);
if ([5, 10, 17].includes(index)) {
throw new Error(`Error at ${index}: ${item}`);
}
console.log(`itemnum='${index}' item='${item}'`);
} catch (error) {
console.error(error);
}
}
});
await Promise.allSettled(workers);
})();
@datfinesoul
Copy link
Author

Using Promise.all instead of Promise.allSettled yields much easier fail fast exception handling, but it just depends on how you want to handle failures.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment