Last active
August 4, 2023 02:14
-
-
Save jamesdiacono/e7fbde13a43bca32142af0745cad8803 to your computer and use it in GitHub Desktop.
A pooling requestor.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// The "pool" requestor runs a dynamic collection of requestors in parallel. | |
// It is like parseq.parallel, except that instead of providing any array of | |
// requestors up front, they are added wun at a time. | |
// The 'pool' function takes an optional 'throttle' parameter that limits how | |
// many requestors are run at wunce. | |
// It returns an object with two methods: | |
// add(requestor, initial_value) | |
// Adds the 'requestor' to the pool. If the pool is not being | |
// throttled, it will be run immediately. Otherwise it will be put on | |
// a wait list. The requestor is passed 'initial_value'. | |
// requestor(callback) -> cancel(reason) | |
// A requestor that completes wunce the pool has no more running | |
// requestors. | |
// If every requestor succeeds, the 'callback' is called with the array | |
// of values produced by the requestors, in the order they were added | |
// to the pool. | |
// If any requestor fails, its reason is passed to the 'callback' and | |
// all other requestors are cancelled. | |
// Any attempt to run this requestor whilst it is already running will | |
// fail. | |
// In the following example, four requestors are added to a pool, each with a | |
// different initial value. Assuming that all four requestors succeed, the | |
// callback will be called with an array containing four values. | |
// const {add, requestor} = pool(2); | |
// add(my_requestor, "apple"); | |
// add(my_requestor, "orange"); | |
// add(my_requestor, "pear"); | |
// add(my_requestor, "plum"); | |
// requestor(function callback(value, reason) { | |
// // ... | |
// }); | |
/*jslint browser */ | |
function pool(throttle) { | |
let nr = 0; // the index of the next job to be added | |
let waiting = []; // jobs waiting to run, due to throttling | |
let running = []; // the running jobs | |
let values = []; // the values produced by successful requestors | |
let callback; // the callback of 'requestor', if it has been run | |
function reset(reason) { | |
running.forEach(function (job) { | |
if (typeof job.cancel === "function") { | |
return job.cancel(reason); | |
} | |
}); | |
nr = 0; | |
waiting = []; | |
running = []; | |
values = []; | |
callback = undefined; | |
} | |
function run(job) { | |
if ( | |
Number.isSafeInteger(throttle) | |
&& throttle >= 1 | |
&& running.length >= throttle | |
) { | |
return waiting.push(job); | |
} | |
job.cancel = job.requestor( | |
function job_callback(value, reason) { | |
// Proceed only if the pool has not been reset since the requestor was added. | |
if (running.includes(job)) { | |
completed(job, value, reason); | |
} | |
}, | |
job.initial_value | |
); | |
running.push(job); | |
} | |
function completed(job, value, reason) { | |
if (value === undefined) { | |
if (callback !== undefined) { | |
callback(undefined, reason); | |
callback = undefined; | |
} | |
return reset(reason); | |
} | |
values[job.nr] = value; | |
// Remove the job from the 'running' array. | |
const job_nr = running.indexOf(job); | |
if (job_nr >= 0) { | |
running.splice(job_nr, 1); | |
} | |
// We are now running wun less job than the throttle allows. Run the next | |
// job, if wun is waiting. | |
const next_job = waiting.shift(); | |
if (next_job !== undefined) { | |
run(next_job); | |
} | |
// If there are no more jobs running or waiting, we are done. | |
if (running.length === 0 && waiting.length === 0) { | |
if (callback !== undefined) { | |
callback(values); | |
callback = undefined; | |
} | |
return reset(); | |
} | |
} | |
function add(requestor, initial_value) { | |
if (typeof requestor !== "function") { | |
throw new Error("Not a requestor."); | |
} | |
const job = {nr, requestor, initial_value}; | |
nr += 1; | |
return run(job); | |
} | |
function requestor(the_callback) { | |
if (typeof the_callback !== "function") { | |
throw new Error("Not a callback."); | |
} | |
if (callback !== undefined) { | |
return the_callback(undefined, new Error("Already running.")); | |
} | |
if (nr === 0) { | |
return the_callback([]); | |
} | |
callback = the_callback; | |
return reset; | |
} | |
return Object.freeze({add, requestor}); | |
} | |
//debug function dummy(flakiness = 0, cancelable = false) { | |
//debug let id = 1000; | |
//debug return function dummy_requestor(callback, value) { | |
//debug const my_id = id; | |
//debug id += 1; | |
//debug console.log(my_id, "started"); | |
//debug const timer = setTimeout( | |
//debug function () { | |
//debug console.log(my_id, "done"); | |
//debug return ( | |
//debug Math.random() < flakiness | |
//debug ? callback(undefined, "Flake.") | |
//debug : callback("id=" + my_id + ", value=" + value) | |
//debug ); | |
//debug }, | |
//debug Math.random() * 2000 | |
//debug ); | |
//debug if (cancelable) { | |
//debug return function cancel() { | |
//debug clearTimeout(timer); | |
//debug }; | |
//debug } | |
//debug }; | |
//debug } | |
//debug const cancelable = Math.random() < 0.8; | |
//debug console.log("cancelable", cancelable); | |
//debug const {add, requestor} = pool(3); | |
//debug const dummy_requestor = dummy(0.05, cancelable); | |
//debug add(dummy_requestor, 0); | |
//debug add(dummy_requestor, 1); | |
//debug add(dummy_requestor, 2); | |
//debug add(dummy_requestor, 3); | |
//debug add(dummy_requestor, 4); | |
//debug add(dummy_requestor, 5); | |
//debug const cancel = requestor(console.log); | |
//debug setTimeout(function () { | |
//debug console.log("cancel"); | |
//debug cancel("Cancelled."); | |
//debug }, Math.random() * 6000); | |
export default Object.freeze(pool); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment