Created
October 8, 2015 23:00
-
-
Save rafaeljesus/014bbf869cacb646fd7e to your computer and use it in GitHub Desktop.
Using continuation passing style with ES6 generators along with node's callback style functions as an alternative to promises for escaping callback hell.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const GeneratorFunction = function*(){}.constructor; | |
const GeneratorFunctionPrototype = GeneratorFunction.prototype; | |
const GeneratorPrototype = GeneratorFunctionPrototype.prototype; | |
const slice = Array.prototype.slice; | |
const concat = Array.prototype.concat; | |
const toString = Object.prototype.toString; | |
/** | |
* A user callback function that is raised when the result of an async function | |
* is available. | |
* | |
* @param Any error | |
* If truthy indicates an error result. Will be thrown from the | |
* generator. | |
* @param Any result | |
* If the callbaxk is successful, will be the yielded value. | |
*/ | |
function Callback(error, result){} | |
/** | |
* The type of function that must be yielded from a generator being run. | |
* | |
* @param Callback callback | |
* The function which will handle the error or result of the | |
* continuation. | |
*/ | |
function Continuation(callback){} | |
/** | |
* A GeneratorFunction where every value yielded must either itself be a | |
* Generator or a Continuation. | |
* | |
* @return Generator | |
*/ | |
function Continuable(/*...args*/){} | |
/** | |
* A no-operation function. | |
*/ | |
const noop = function(){}; | |
/** | |
* Coerce a value to an object. | |
* | |
* @param any value | |
* The value to coerce. | |
* @return Object | |
*/ | |
const toObject = function(value){ | |
if (value == null) { | |
throw new TypeError('Cannot coerce to object.'); | |
} | |
return Object(value); | |
}; | |
/** | |
* Coerce a value to an unsigned 32 bit integer; | |
* | |
* @param any value | |
* The value to coerce. | |
* @return Number | |
*/ | |
const toUint32 = function(value){ | |
return value >>> 0; | |
}; | |
/** | |
* Wrap a function so it is only called the first time the wrapper is called. | |
* | |
* @param Function fn | |
* The function to wrap. | |
* @return Function | |
* The wrapper. | |
*/ | |
const once = function(fn){ | |
return function(){ | |
if (fn) { | |
const f = fn; | |
fn = null; | |
return f.apply(this, arguments); | |
} | |
}; | |
}; | |
/** | |
* Check whether a given value is a GeneratorFunction. | |
* | |
* @param Any value | |
* The value to check. | |
* @return Boolean | |
*/ | |
const isGeneratorFunction = function(value){ | |
return value instanceof GeneratorFunction | |
|| typeof value === 'function' | |
&& !!value.constructor | |
&& value.constructor.name === 'GeneratorFunction'; | |
}; | |
/** | |
* Check whether a given value is a Generator. | |
* | |
* @param Any value | |
* The value to check. | |
* @return Boolean | |
*/ | |
const isGenerator = function(value){ | |
return toString.call(value) === '[object Generator]'; | |
}; | |
// `send` was removed after an early iteration in V8's implementation | |
const send = 'send' in GeneratorPrototype ? 'send' : 'next'; | |
/** | |
* Uses continuation passing style to iterate through each yield in a Generator. | |
* | |
* @param Generator generator | |
* The generator object to pump. | |
* @param Continuation continuation | |
* The continuation which will be passed the callback that dispatches to | |
* the generator. | |
*/ | |
const pump = function(generator, continuation){ | |
if (typeof continuation !== 'function') { | |
throw new TypeError("Yielded a non-function"); | |
} | |
continuation(function(err, result){ | |
if (err) { | |
generator.throw(err); | |
} else { | |
const next = generator[send](result).value; | |
next && pump(generator, next); | |
} | |
}); | |
}; | |
/** | |
* Handles either a Continuable function or a Generator. If passed a Generator a | |
* nested event loop is run to completion to obtain the final value. | |
* | |
* @param Continuable|Continuation | |
* The thing to pump a single turn. | |
* @param Callback cb | |
* The Callback to be passed to the Continuable. | |
*/ | |
const resolve = function(value, cb){ | |
if (typeof value === 'function') { | |
value(cb); | |
} else if (isGenerator(value)) { | |
run(function*(){ | |
try { | |
cb(null, yield* value); | |
} catch (e) { | |
cb(e); | |
} | |
}); | |
} else { | |
throw new TypeError('Must return a Continuation or Continuable'); | |
} | |
}; | |
/** | |
* Creates a generator from a generator function and begins executing it. | |
* | |
* @param GeneratorFunction generatorFn | |
* The generator function to execute. Every yielded value should be a | |
* Continuable. | |
*/ | |
const run = exports.run = function run(generatorFn){ | |
const generator = generatorFn(); | |
const first = generator.next(); | |
if (!first.done) { | |
pump(generator, first.value); | |
} | |
}; | |
/** | |
* Wrap an async function as a GeneratorFunction so it can be used with | |
* delegating yield (yield*). | |
* | |
* @param Function fn | |
* Function to wrap. This function must accept a Callback as its last | |
* parameter. | |
* @return Continuable | |
* Wrapped version of the function that can be yielded to in a | |
* generator executed using `run`. | |
*/ | |
const wrap = exports.wrap = function wrap(fn){ | |
return function*(){ | |
const args = slice.call(arguments); | |
const receiver = this; | |
return yield function(cb){ | |
fn.apply(receiver, args.concat(cb)); | |
}; | |
}; | |
}; | |
const _sleep = wrap(function(ms, cb){ | |
const start = Date.now(); | |
setTimeout(function(){ | |
cb(null, Date.now() - start); | |
}, ms); | |
}); | |
/** | |
* Helper that can be used to pause execution. | |
* | |
* @param Number ms | |
* Time to pause. | |
* @return Continuation | |
* Function that can be be yielded to in a generator executed using | |
* `run`. | |
*/ | |
exports.sleep = function sleep(ms){ | |
return _sleep(ms); | |
}; | |
const _tick = wrap(function(){ | |
if (typeof process === 'undefined') { | |
return function(cb){ | |
setTimeout(cb, 0); | |
}; | |
} | |
return process.nextTick; | |
}()); | |
/** | |
* Helper that waits until the next event loop tick. | |
* | |
* @return Continuation | |
* Function that can be be yielded to in a generator executed using | |
* `run`. | |
*/ | |
exports.tick = function tick(){ | |
return _tick(); | |
}; | |
/** | |
* Uses a Continuable function to sequentially map an array of values. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable cb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The mapped array. | |
*/ | |
exports.forEach = function* forEach(array, itemcb, receiver){ | |
const obj = toObject(array); | |
const len = toUint32(obj.length); | |
for (let i = 0; i < len; i++) { | |
if (i in obj) { | |
yield* itemcb.call(receiver, obj[i], i, array); | |
} | |
} | |
}; | |
/** | |
* Uses a Continuable function to sequentially map an array of values. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The mapped array. | |
*/ | |
exports.map = function* map(array, itemcb, receiver){ | |
const obj = toObject(array); | |
const len = toUint32(obj.length); | |
const result = new Array(len); | |
for (let i = 0; i < len; i++) { | |
if (i in obj) { | |
result[i] = yield* itemcb.call(receiver, obj[i], i, array); | |
} | |
} | |
return result; | |
}; | |
/** | |
* Uses a Continuable function to sequentially filter a set of values. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The filtered array. | |
*/ | |
exports.filter = function* filter(array, itemcb, receiver){ | |
const obj = toObject(array); | |
const len = toUint32(obj.length); | |
const result = []; | |
for (let i = 0; i < len; i++) { | |
const item = obj[i]; | |
if (i in obj && (yield* itemcb.call(receiver, item, i, array))) { | |
result[result.length] = item; | |
} | |
} | |
return result; | |
}; | |
/** | |
* Uses a Continuable function to reduce a set of values. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Any | |
* The final accumulated value. | |
*/ | |
exports.reduce = function* reduce(array, itemcb, initial){ | |
const obj = toObject(array); | |
const len = toUint32(obj.length); | |
let start = arguments.length < 3 ? 1 : 0; | |
let accum = start ? obj[0] : initial; | |
for (let i = start; i < len; i++) { | |
if (i in obj) { | |
accum = yield* itemcb(accum, obj[i], array); | |
} | |
} | |
return accum; | |
}; | |
/** | |
* Returns true the first time a Continuable function returns a truthy value | |
* against a set of values, otherwise returns false. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Boolean | |
*/ | |
exports.some = function* some(array, itemcb, receiver){ | |
const obj = toObject(array); | |
const len = toUint32(obj.length); | |
for (let i = 0; i < len; i++) { | |
if (i in obj && (yield* itemcb.call(receiver, obj[i], i, array))) { | |
return true; | |
} | |
} | |
return false; | |
}; | |
/** | |
* Returns false the first time a Continuable function returns a falsey value | |
* against a set of values, otherwise returns true. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Boolean | |
*/ | |
exports.every = function* every(array, itemcb, receiver){ | |
const obj = toObject(array); | |
const len = toUint32(obj.length); | |
for (let i = 0; i < len; i++) { | |
if (i in obj && !(yield* itemcb.call(receiver, obj[i], i, array))) { | |
return false; | |
} | |
} | |
return true; | |
}; | |
/** | |
* Create a parallelized function that works over an array of values. | |
* | |
* @param Function transform | |
* The callback that transforms each value set in the returned array. | |
* @param Function finalize | |
* The callback that transforms the completed array as a whole. | |
* @return Continuable | |
*/ | |
const parallelFunction = function(transform, finalize){ | |
return wrap(function(array, itemcb, receiver, cb){ | |
const obj = toObject(array); | |
const len = toUint32(obj.length); | |
const result = new Array(len); | |
if (!len) { | |
return void cb(null, finalize(result)); | |
} | |
let remaining = len; | |
cb = once(cb); | |
const handle = function(input, index){ | |
return function(err, value){ | |
if (err) { | |
cb(err); | |
} else if (!(index in result)) { | |
result[index] = transform(input, value); | |
if (!--remaining) { | |
cb(null, finalize(result)); | |
} | |
} | |
}; | |
}; | |
for (let i=0; i < len; i++) { | |
if (i in obj) { | |
const item = obj[i]; | |
resolve(itemcb.call(receiver, item, i, array), handle(item, i)); | |
} else { | |
remaining--; | |
} | |
} | |
}); | |
}; | |
const _parallelForEach = parallelFunction(noop, noop); | |
/** | |
* Uses a Continuable function to map an array of values in parallel. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The filtered array. | |
*/ | |
exports.parallelForEach = function parallelForEach(array, itemcb, receiver){ | |
return _parallelForEach(array, itemcb, receiver); | |
}; | |
const _parallelMap = parallelFunction(function(input, result){ | |
return result; | |
}, function(array){ | |
return array; | |
}); | |
/** | |
* Uses a Continuable function to map an array of values in parallel. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The filtered array. | |
*/ | |
exports.parallelMap = function parallelMap(array, itemcb, receiver){ | |
return _parallelMap(array, itemcb, receiver); | |
}; | |
const EMPTY = {}; | |
const _parallelFilter = parallelFunction(function(input, result){ | |
return result ? input : EMPTY; | |
}, function(array){ | |
return array.filter(function(item){ | |
return item !== EMPTY; | |
}); | |
}); | |
/** | |
* Uses a Continuable function to filter a set of values in parallel. | |
* | |
* @param Array array | |
* The set of values to map over. | |
* @param Continuable itemcb | |
* The function to be applied to each value. | |
* @param Any [receiver] | |
* The |this| value in the callback. | |
* @return Array | |
* The filtered array. | |
*/ | |
exports.parallelFilter = function parallelFilter(array, itemcb, receiver){ | |
return _parallelFilter(array, itemcb, receiver); | |
}; | |
const _join = wrap(function(items, cb){ | |
const result = []; | |
if (!remaining) { | |
return void cb(null, result); | |
} | |
let remaining = items.length; | |
cb = once(cb); | |
const handle = function(index){ | |
return function(err, value){ | |
if (err) { | |
cb(err); | |
} else if (!(index in result)) { | |
result[index] = value; | |
if (!--remaining) { | |
cb(null, result); | |
} | |
} | |
}; | |
}; | |
for (let i=0, item; item = items[i]; i++) { | |
resolve(item, handle(i)); | |
} | |
}); | |
/** | |
* Take multiple Continuables and combine them into a single Continuable | |
* that returns an array of the completed values, or errors if any error. | |
* | |
* @param ...Continuable args | |
* Any amount of Continuables or arrays of Continuables to join. | |
* @return Continuable | |
* The combined Continuable which will yield the results as an array. | |
*/ | |
exports.join = function join(/* ...args */){ | |
return _join(concat.apply([], arguments)); | |
}; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"use strict"; | |
const path = require('path'); | |
const gen = require('../'); | |
function wrapAll(obj, names){ | |
const out = {}; | |
names.forEach(function(name){ | |
out[name] = gen.wrap(obj[name]); | |
}); | |
return out; | |
} | |
const fs = wrapAll(require('fs'), [ | |
'rename', 'ftruncate', 'truncate', 'chown', 'fchown', 'lchown', 'chmod','fchmod', | |
'lchmod', 'stat', 'lstat', 'fstat', 'link', 'symlink', 'readlink', 'realpath', | |
'unlink', 'rmdir', 'mkdir', 'readdir', 'close', 'open', 'utimes', 'futimes', | |
'fsync', 'write', 'read', 'readFile', 'writeFile', 'appendFile' | |
]); | |
function* fulldir(dir){ | |
return (yield* fs.readdir(dir)).map(function(child){ | |
return path.resolve(dir, child); | |
}); | |
} | |
function* statdir(dir){ | |
const children = yield* fulldir(dir); | |
return yield* gen.map(children, function(filepath){ | |
return fs.stat(filepath); | |
}); | |
} | |
function* sizedir(dir){ | |
return (yield* statdir(dir)).reduce(function(total, child){ | |
return child.isFile() ? child.size + total : total; | |
}, 0); | |
} | |
function* x10(value){ | |
yield* gen.sleep(20); | |
return value * 10; | |
} | |
gen.run(function*(){ | |
console.time('sequential'); | |
console.log(yield* gen.map([1, 2, 3, 4, 5], x10)); | |
console.timeEnd('sequential'); | |
console.time('parallel'); | |
console.log(yield* gen.parallelMap([1, 2, 3, 4, 5], x10)); | |
console.timeEnd('parallel'); | |
console.log(yield* gen.join(fs.stat('.'), fs.stat('..'))); | |
console.log(yield* sizedir('..')); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment