Last active
June 3, 2016 19:44
-
-
Save wishfoundry/ffe4f61c7acd276371d7 to your computer and use it in GitHub Desktop.
An implementation of Rx.ReplaySubject for cujo/most
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var map = require("./collection").map; | |
var MAX_SAFE = Math.pow(2, 53) - 1; | |
/** | |
* buffer/cache | |
* | |
* @param {number} max | |
* @param {number} maxInterval | |
* @param {function} nowGetter | |
* @constructor | |
*/ | |
function Buffer(max, maxInterval, nowGetter) { | |
this.constructor(max, maxInterval, nowGetter); | |
} | |
Buffer.prototype.constructor = function(max, maxInterval, nowGetter) { | |
max = Math.min((Number(max) || 1), MAX_SAFE); | |
maxInterval = Math.min((Number(maxInterval) || MAX_SAFE), MAX_SAFE); | |
var getNow = nowGetter || function() { | |
return +new Date(); | |
}; | |
var buffer = []; | |
// ensure buffer is within size and time constraints | |
function flush(now) { | |
if (!buffer.length) | |
return; | |
var start = 0; | |
while (!isInWindow(buffer[start].interval, now, maxInterval)) { | |
start++; | |
} | |
start = Math.max(start, (buffer.length - max)); | |
buffer = buffer.slice(start, buffer.length); | |
} | |
function isInWindow(interval, now, window) { | |
return (now - interval) < window; | |
} | |
function enQueue(value) { | |
var now = getNow(); | |
if (buffer.length < MAX_SAFE) | |
buffer.push({ | |
value : value, | |
interval: now | |
}); | |
flush(now); | |
} | |
function each(fn) { | |
flush(getNow()); | |
each(buffer, function(item) { | |
fn(item.value) | |
}); | |
} | |
function values() { | |
flush(getNow()); | |
return map(buffer, function(item) { | |
return item.value; | |
}); | |
} | |
this.add = enQueue; | |
this.forEach = each; | |
this.clearAll = function() { | |
buffer = []; | |
}; | |
this.getAll = function() { | |
return values(); | |
} | |
}; | |
module.exports = Buffer; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
var most = require("../vendor/most"); | |
var Buffer = require("./Buffer"); | |
/** | |
* A ReplayStream caches events from the src stream and replays them onto a new forked stream when | |
* new subscribers are added. | |
* | |
* ReplayStreams(aka RepalySubjects) follow the "Subject pattern, in that Subjects are both observables and observers. | |
* https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/replaysubject.md | |
* | |
* @param {number} maxBuffer | |
* @param {number} maxInterval | |
* @constructor | |
*/ | |
function ReplayStream(maxBuffer, maxInterval) { | |
this.constructor(maxBuffer, maxInterval); | |
} | |
ReplayStream.prototype.constructor = function(maxItems, maxInterval) { | |
var _add, _end, _error; | |
var buffer = new Buffer(maxItems, maxInterval); | |
var stream = most.create(function(add, end, error) { | |
_add = function(value) { | |
buffer.add(value); | |
add(value); | |
}; | |
_end = end; | |
_error = error; | |
return function onEnd() { | |
_add = _error = _end = function noop() {}; // if events come after the stream has ended, safely noop them | |
buffer.clearAll(); // since we still reference buffer, JS won't GC it. best to just empty the queue | |
}; | |
}); | |
// Subjects are by definition "hot", and must be initialized | |
// here we create a demand to init stream | |
stream.drain(); | |
// different libararies (e.g. rxjs, kefirjs, baconjs, etc) can have different names | |
// for the "subscribe" event. | |
function subscribe(onNext) { | |
return fork().observe(onNext); | |
} | |
this.forEach = this.observe = subscribe; | |
// since observing forces the stream to init, we expose the "cold" stream here to allow further config | |
function fork() { | |
return most.from(buffer.getAll()).concat(stream) | |
} | |
this.stream = this.fork = fork; | |
this.add = function(value, unsafeFast) { | |
// many observables can have issues resolving if in the same thread/loop, but this can be a performance benny... | |
if (unsafeFast) { | |
_add(value); | |
} else { | |
setTimeout(function() { | |
_add(value); | |
}, 0); | |
} | |
return this; | |
}; | |
this.error = function(value) { | |
_error(value); | |
return this; | |
}; | |
this.end = function() { | |
_end(); | |
return this; | |
}; | |
}; | |
module.exports = function create(maxBuffer, maxInterval) { | |
return new ReplayStream(maxBuffer, maxInterval); | |
}; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment