Skip to content

Instantly share code, notes, and snippets.

@alexstrat
Created May 15, 2012 13:59
Show Gist options
  • Save alexstrat/2701981 to your computer and use it in GitHub Desktop.
Save alexstrat/2701981 to your computer and use it in GitHub Desktop.
Iterative map/reduce
var Deferred = require('./deferred');
var IterativeMapReduce = module.exports = Deferred.extend({
initialize: function() {
this.supr();
this._mapOnFly;
//default finish function
this.finish_function = function(end_value, reducer_result) {
this.resolve(reducer_result);
}
},
/**
* Easy setter for the map function.
*
* A map function takes 3 arguments that have to be processed :
* {*} to_map - the consumable of the map process
* {Function} done - the function to call with the map result
* {Function} [end] - call this function to end the map/reduce process
*
* The done() function should be called once exactly :
* if you forget to call it, the proccess won't never end
* (unless you call end function)
*
* Pass a prameter to the end() function if you want to retrieve it
* in the finish function.
*
* @param {Function} map_function [description]
*/
map: function(map_function) {
this.map_function = map_function;
},
/**
* Easy setter for the reduce function.
*
* A reduce function takes 4 arguments that have to be processed :
* {*} previous_value - the value previously returned in the last
* invocation of reduce
* {*} to_reduce - value to reduce
* {Function} done - the function to call with the reduce result
* {Function} [next] - call this function to feed the mpper with fresh
* consumable : this is the way to iterate.
*
* @param {[type]} reduce_function [description]
* @param {*} init_value - initial reduce value
*/
reduce: function(reduce_function, init_value) {
this.reduce_function = reduce_function;
this._currentReduceResult = init_value;
},
/**
* Easy setter for the finish function.
* The finish function is called at teh end of the iterative process
* (no more map processe or call to end()). It should resolve or reject the deferred.
*
* This function takes 2 arguments :
* {*} value - value passed during end() if any or null
* {*} reduce_result - the current reduce process
*
* @param {Function} end_function [description]
*/
finish: function(finish_function) {
this.finish_function = finish_function;
},
/**
* Start the iterative map/reduce given the this array of
* map consumable.
*
* @param {[type]} array [description]
* @return {[type]} [description]
*/
start: function(array) {
if(array.length === 0) return this.finish_function(null);
var that = this;
array.forEach(function(to_map) {
that._launchMap(to_map);
});
},
_launchMap: function(to_map) {
if this.isCompleted return;
var that = this;
//TODO : need a protection to be called only once
var done = function(map_result) {
if(typeof map_result !== 'undefined') {
that._launchReduce(map_result);
}
that._mapOnFly --;
if(that._mapOnFly === 0)
that.finish_function(
null,
that._currentReduceResult
);
};
var end = function(value) {
that.finish_function(
value,
that._currentReduceResult
);
};
this._mapOnFly ++;
this.map_function.call(this, to_map, done, end);
},
_launchReduce: function(to_reduce) {
if this.isCompleted return;
var that = this;
//TODO : need a protection to be called only once
var done = function(reduce_result) {
that._currentReduceResult = reduce_result;
};
var next = function(to_map) {
that._launchMap(to_map);
};
this.reduce_function.call(this, this._currentReduceResult, to_reduce, done, next);
}
})
var queried = new PeerArray();
new IterativeMapReduce()
.map(function(peer, done, end) {
if(queried.contains(peer))
reurn done();
var rpc = new FindValueRPC(peer, TARGET);
rpc.then(end, done);
})
.reduce(function(peers, new_peers, done, next) {
peers.add(new_peers);
new_peers.pickOutFirts(globals.K)
.forEach(next);
done(peers);
}, new XORSortedpeerArray())
.finish(function(end_value, reducer_result) {
if(end_value && end_value !== null)
this.resolve(end_value);
else
this.reject(reducer_result);
})
.start([bootstrap1, bootstrap2])
.then(function(res){
console.log(res);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment