Created
January 24, 2015 11:47
-
-
Save storkme/25c38441658141b419f3 to your computer and use it in GitHub Desktop.
Generate a 'sliding window' of values produced by a time-series source.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var Rx = require('rx'); | |
/** | |
* Generate a 'sliding window' of the input time series. Please note that this function assumes that the observable | |
* input series is a linear time series, that is: `selector(n) < selector(n+1)`. Otherwise things will get weird. | |
* | |
* @param windowSizeMs - size of the sliding window in ms | |
* @param {function} [selector] - selector function that obtains the unix timestamp value for a given item | |
* @param {Rx.Scheduler} [scheduler] - optional scheduler | |
* @returns {Rx.Observable<T>} | |
*/ | |
Rx.Observable.prototype.slidingWindow = function (windowSizeMs, selector, scheduler) { | |
Rx.helpers.isScheduler(scheduler) || (scheduler = Rx.Scheduler.timeout); | |
//if no selector is supplied, use a default one that assigns the current time to the item | |
typeof selector === 'function' || (selector = function () { | |
return scheduler.now(); | |
}); | |
var self = this; | |
return Rx.Observable.create(function (obs) { | |
var disposables = new Rx.CompositeDisposable(), | |
buf = []; | |
disposables.add(self.subscribe(function (next) { | |
var t = selector(next); | |
if (t > scheduler.now() - windowSizeMs) { | |
//if the new value is in our window, add it | |
buf = buf.concat(next); | |
//emit the new buffer | |
obs.onNext(buf); | |
//schedule a function to remove the oldest value when this value is due to expire. | |
var removeAt = t + windowSizeMs; | |
disposables.add(scheduler.scheduleWithAbsolute(removeAt, function () { | |
buf.splice(0, 1); | |
obs.onNext(buf); | |
})); | |
} | |
}, | |
obs.onError.bind(obs), | |
function () { | |
disposables.dispose(); | |
obs.onCompleted(); | |
})); | |
return disposables; | |
}); | |
}; | |
// here's some test stuff. | |
var source = Rx.Observable.generateWithRelativeTime( | |
0, | |
function () { | |
//#cantstopwontstop | |
return true; | |
}, | |
function (x) { | |
return x + 1; | |
}, | |
function (x) { | |
return x; | |
}, | |
function (x) { | |
//generate a new value at your leisure some time in the next 5 seconds pls | |
return Math.round(Math.random() * 5000); | |
}); | |
source | |
.slidingWindow(30000) | |
.subscribe(function (result) { | |
console.log("there were " + result.length + " items in the last 30s"); | |
}, | |
function (err) { | |
console.error(err); | |
}, | |
function () { | |
console.log("done"); | |
}); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment