Created
May 15, 2012 13:59
-
-
Save alexstrat/2701981 to your computer and use it in GitHub Desktop.
Iterative map/reduce
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Deferred = require('./deferred'); | |
var IterativeMapReduce = module.exports = Deferred.extend({ | |
initialize: function() { | |
this.supr(); | |
this._mapOnFly; | |
//default finish function | |
this.finish_function = function(end_value, reducer_result) { | |
this.resolve(reducer_result); | |
} | |
}, | |
/** | |
* Easy setter for the map function. | |
* | |
* A map function takes 3 arguments that have to be processed : | |
* {*} to_map - the consumable of the map process | |
* {Function} done - the function to call with the map result | |
* {Function} [end] - call this function to end the map/reduce process | |
* | |
* The done() function should be called once exactly : | |
* if you forget to call it, the proccess won't never end | |
* (unless you call end function) | |
* | |
* Pass a prameter to the end() function if you want to retrieve it | |
* in the finish function. | |
* | |
* @param {Function} map_function [description] | |
*/ | |
map: function(map_function) { | |
this.map_function = map_function; | |
}, | |
/** | |
* Easy setter for the reduce function. | |
* | |
* A reduce function takes 4 arguments that have to be processed : | |
* {*} previous_value - the value previously returned in the last | |
* invocation of reduce | |
* {*} to_reduce - value to reduce | |
* {Function} done - the function to call with the reduce result | |
* {Function} [next] - call this function to feed the mpper with fresh | |
* consumable : this is the way to iterate. | |
* | |
* @param {[type]} reduce_function [description] | |
* @param {*} init_value - initial reduce value | |
*/ | |
reduce: function(reduce_function, init_value) { | |
this.reduce_function = reduce_function; | |
this._currentReduceResult = init_value; | |
}, | |
/** | |
* Easy setter for the finish function. | |
* The finish function is called at teh end of the iterative process | |
* (no more map processe or call to end()). It should resolve or reject the deferred. | |
* | |
* This function takes 2 arguments : | |
* {*} value - value passed during end() if any or null | |
* {*} reduce_result - the current reduce process | |
* | |
* @param {Function} end_function [description] | |
*/ | |
finish: function(finish_function) { | |
this.finish_function = finish_function; | |
}, | |
/** | |
* Start the iterative map/reduce given the this array of | |
* map consumable. | |
* | |
* @param {[type]} array [description] | |
* @return {[type]} [description] | |
*/ | |
start: function(array) { | |
if(array.length === 0) return this.finish_function(null); | |
var that = this; | |
array.forEach(function(to_map) { | |
that._launchMap(to_map); | |
}); | |
}, | |
_launchMap: function(to_map) { | |
if this.isCompleted return; | |
var that = this; | |
//TODO : need a protection to be called only once | |
var done = function(map_result) { | |
if(typeof map_result !== 'undefined') { | |
that._launchReduce(map_result); | |
} | |
that._mapOnFly --; | |
if(that._mapOnFly === 0) | |
that.finish_function( | |
null, | |
that._currentReduceResult | |
); | |
}; | |
var end = function(value) { | |
that.finish_function( | |
value, | |
that._currentReduceResult | |
); | |
}; | |
this._mapOnFly ++; | |
this.map_function.call(this, to_map, done, end); | |
}, | |
_launchReduce: function(to_reduce) { | |
if this.isCompleted return; | |
var that = this; | |
//TODO : need a protection to be called only once | |
var done = function(reduce_result) { | |
that._currentReduceResult = reduce_result; | |
}; | |
var next = function(to_map) { | |
that._launchMap(to_map); | |
}; | |
this.reduce_function.call(this, this._currentReduceResult, to_reduce, done, next); | |
} | |
}) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var queried = new PeerArray(); | |
new IterativeMapReduce() | |
.map(function(peer, done, end) { | |
if(queried.contains(peer)) | |
reurn done(); | |
var rpc = new FindValueRPC(peer, TARGET); | |
rpc.then(end, done); | |
}) | |
.reduce(function(peers, new_peers, done, next) { | |
peers.add(new_peers); | |
new_peers.pickOutFirts(globals.K) | |
.forEach(next); | |
done(peers); | |
}, new XORSortedpeerArray()) | |
.finish(function(end_value, reducer_result) { | |
if(end_value && end_value !== null) | |
this.resolve(end_value); | |
else | |
this.reject(reducer_result); | |
}) | |
.start([bootstrap1, bootstrap2]) | |
.then(function(res){ | |
console.log(res); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment