Skip to content

Instantly share code, notes, and snippets.

@jamesdiacono
Last active August 4, 2023 02:14
Show Gist options
  • Save jamesdiacono/e7fbde13a43bca32142af0745cad8803 to your computer and use it in GitHub Desktop.
Save jamesdiacono/e7fbde13a43bca32142af0745cad8803 to your computer and use it in GitHub Desktop.
A pooling requestor.
// The "pool" requestor runs a dynamic collection of requestors in parallel.
// It is like parseq.parallel, except that instead of providing any array of
// requestors up front, they are added wun at a time.
// The 'pool' function takes an optional 'throttle' parameter that limits how
// many requestors are run at wunce.
// It returns an object with two methods:
// add(requestor, initial_value)
// Adds the 'requestor' to the pool. If the pool is not being
// throttled, it will be run immediately. Otherwise it will be put on
// a wait list. The requestor is passed 'initial_value'.
// requestor(callback) -> cancel(reason)
// A requestor that completes wunce the pool has no more running
// requestors.
// If every requestor succeeds, the 'callback' is called with the array
// of values produced by the requestors, in the order they were added
// to the pool.
// If any requestor fails, its reason is passed to the 'callback' and
// all other requestors are cancelled.
// Any attempt to run this requestor whilst it is already running will
// fail.
// In the following example, four requestors are added to a pool, each with a
// different initial value. Assuming that all four requestors succeed, the
// callback will be called with an array containing four values.
// const {add, requestor} = pool(2);
// add(my_requestor, "apple");
// add(my_requestor, "orange");
// add(my_requestor, "pear");
// add(my_requestor, "plum");
// requestor(function callback(value, reason) {
// // ...
// });
/*jslint browser */
function pool(throttle) {
let nr = 0; // the index of the next job to be added
let waiting = []; // jobs waiting to run, due to throttling
let running = []; // the running jobs
let values = []; // the values produced by successful requestors
let callback; // the callback of 'requestor', if it has been run
function reset(reason) {
running.forEach(function (job) {
if (typeof job.cancel === "function") {
return job.cancel(reason);
}
});
nr = 0;
waiting = [];
running = [];
values = [];
callback = undefined;
}
function run(job) {
if (
Number.isSafeInteger(throttle)
&& throttle >= 1
&& running.length >= throttle
) {
return waiting.push(job);
}
job.cancel = job.requestor(
function job_callback(value, reason) {
// Proceed only if the pool has not been reset since the requestor was added.
if (running.includes(job)) {
completed(job, value, reason);
}
},
job.initial_value
);
running.push(job);
}
function completed(job, value, reason) {
if (value === undefined) {
if (callback !== undefined) {
callback(undefined, reason);
callback = undefined;
}
return reset(reason);
}
values[job.nr] = value;
// Remove the job from the 'running' array.
const job_nr = running.indexOf(job);
if (job_nr >= 0) {
running.splice(job_nr, 1);
}
// We are now running wun less job than the throttle allows. Run the next
// job, if wun is waiting.
const next_job = waiting.shift();
if (next_job !== undefined) {
run(next_job);
}
// If there are no more jobs running or waiting, we are done.
if (running.length === 0 && waiting.length === 0) {
if (callback !== undefined) {
callback(values);
callback = undefined;
}
return reset();
}
}
function add(requestor, initial_value) {
if (typeof requestor !== "function") {
throw new Error("Not a requestor.");
}
const job = {nr, requestor, initial_value};
nr += 1;
return run(job);
}
function requestor(the_callback) {
if (typeof the_callback !== "function") {
throw new Error("Not a callback.");
}
if (callback !== undefined) {
return the_callback(undefined, new Error("Already running."));
}
if (nr === 0) {
return the_callback([]);
}
callback = the_callback;
return reset;
}
return Object.freeze({add, requestor});
}
//debug function dummy(flakiness = 0, cancelable = false) {
//debug let id = 1000;
//debug return function dummy_requestor(callback, value) {
//debug const my_id = id;
//debug id += 1;
//debug console.log(my_id, "started");
//debug const timer = setTimeout(
//debug function () {
//debug console.log(my_id, "done");
//debug return (
//debug Math.random() < flakiness
//debug ? callback(undefined, "Flake.")
//debug : callback("id=" + my_id + ", value=" + value)
//debug );
//debug },
//debug Math.random() * 2000
//debug );
//debug if (cancelable) {
//debug return function cancel() {
//debug clearTimeout(timer);
//debug };
//debug }
//debug };
//debug }
//debug const cancelable = Math.random() < 0.8;
//debug console.log("cancelable", cancelable);
//debug const {add, requestor} = pool(3);
//debug const dummy_requestor = dummy(0.05, cancelable);
//debug add(dummy_requestor, 0);
//debug add(dummy_requestor, 1);
//debug add(dummy_requestor, 2);
//debug add(dummy_requestor, 3);
//debug add(dummy_requestor, 4);
//debug add(dummy_requestor, 5);
//debug const cancel = requestor(console.log);
//debug setTimeout(function () {
//debug console.log("cancel");
//debug cancel("Cancelled.");
//debug }, Math.random() * 6000);
export default Object.freeze(pool);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment