Last active
May 30, 2016 16:02
-
-
Save boxofrox/41d66d841e37d3827796 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
const R = require('ramda'); | |
const Future = require('ramda-fantasy').Future; | |
// :: Int -> [Future a b] -> Future a [b] | |
// number of workers :: Int | |
// tasks to do :: [Future a b] | |
const batchParallel = R.curry((N, as) => { | |
if (0 === as.length || N < 1) { | |
return Future.of([]); | |
} | |
let idx = 0; // task index. | |
let completed = 0; // number of completed tasks. | |
let n = 0; // number of active tasks. | |
let errorResult = null; // error message of first failed task. | |
let resolved = false; // so we ignore active tasks that complete after an error. | |
const len = as.length; | |
const results = new Array(len); | |
const nextTask = () => idx += 1; | |
const push = () => n += 1; // signal a new active task. | |
const pop = () => { | |
n -= 1; // signal an open slot for a task. | |
completed += 1; // signal one more completed task. | |
}; | |
return new Future((rej, ok) => { | |
const schedule = () => { | |
if (resolved) { | |
return; | |
} | |
else if (errorResult) { | |
resolved = true; | |
return rej(errorResult); // we're done here | |
} | |
else if (N === completed) { | |
resolved = true; // this should be a no-op. | |
return ok(results); // all tasks completed | |
} | |
while (n < N && idx < len) { | |
run(as[idx], idx); // run task idx | |
nextTask(); | |
} | |
}; | |
const run = (task, i) => { | |
push(); | |
task.fork( | |
error => { | |
errorResult = error; | |
pop(); | |
schedule(); | |
}, | |
result => { | |
results[i] = result; | |
pop(); | |
schedule(); | |
} | |
); | |
}; | |
schedule(); | |
}); | |
}); | |
module.exports = batchParallel; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment