Last active
December 28, 2015 22:59
-
-
Save lukesutton/7575195 to your computer and use it in GitHub Desktop.
The beginning of a very simple streams library.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // http://jsfiddle.net/MrkuK/ | |
| // Test version of a stream | |
| function Stream(opts) { | |
| this._subscribers = []; | |
| this.opts = _.extend({ | |
| autoDispose: false, | |
| valueOnSubscribe: false | |
| }, opts); | |
| } | |
| Stream.prototype = { | |
| // Helper which propogates values to subscribers | |
| propogate: function(v) { | |
| if (this.opts.valueOnSubscribe) {this._value = v;} | |
| _.each(this._subscribers, function(s) {s(v);}); | |
| }, | |
| subscribe: function(f) { | |
| this._subscribers.push(f); | |
| if (this.opts.valueOnSubscribe && !_.isEmpty(this._value)) { | |
| f(this._value); | |
| } | |
| return this; | |
| }, | |
| unsubscribe: function(f) { | |
| this._subscribers = _.without(this._subscribers, f); | |
| if (this.opts.autoDispose && _.isEmpty(this._subscribers)) {this.dispose();} | |
| }, | |
| dispose: function() { | |
| }, | |
| map: function(f) { | |
| return this._specialise(function(stream, v) { | |
| stream.propogate(f(v)); | |
| }); | |
| }, | |
| filter: function(f) { | |
| return this._specialise(function(stream, v) { | |
| if (f(v)) {stream.propogate(v);} | |
| }); | |
| }, | |
| merge: function(s) { | |
| var handler = function(stream, v) {stream.propogate(v)} | |
| return this._specialiseOnStream(s, handler, handler); | |
| }, | |
| // Similar to .combineLatest(), except that it only fires with each new _pair_. | |
| // That is, it won't combine a new value with an old value, it'll instead wait | |
| // for new values on both streams. This is an effective way to pause on two | |
| // streams until both have updated. | |
| zip: function(s) { | |
| var valueA; | |
| var valueB; | |
| var handlerA = function(stream, v) { | |
| valueA = v; | |
| if (!_.isEmpty(valueB)) { | |
| stream.propogate([valueA, valueB]) | |
| valueA = valueB = undefined; | |
| } | |
| }; | |
| var handlerB = function(stream, v) { | |
| valueB = v; | |
| if (!_.isEmpty(valueA)) { | |
| stream.propogate([valueA, valueB]) | |
| valueA = valueB = undefined; | |
| } | |
| }; | |
| return this._specialiseOnStream(s, handlerA, handlerB); | |
| }, | |
| // Pauses the propagation of values through a stream until a new value | |
| // arrives on the argument stream. | |
| sampleBy: function(s) { | |
| var value; | |
| var handlerA = function(stream, v) {value = v;}; | |
| var handlerB = function(stream, v) {if (!_.isEmpty(value) {stream.propogate(value)}}; | |
| return this._specialiseOnStream(s, handlerA, handlerB); | |
| }, | |
| combineLatest: function(s) { | |
| var latestA; | |
| var latestB; | |
| var handlerA = function(stream, v) { | |
| latestA = v; | |
| if (!_.isEmpty(latestB)) { | |
| stream.propogate([latestA, latestB]); | |
| } | |
| }; | |
| var handlerB = function(stream, v) { | |
| latestB = v; | |
| if (!_.isEmpty(latestA)) { | |
| stream.propogate([latestA, latestB]); | |
| } | |
| }; | |
| return this._specialiseOnStream(s, handlerA, handlerB); | |
| }, | |
| skipDuplicates: function() { | |
| var previous; | |
| return this._specialise(function(stream, v) { | |
| if (!_.isEqual(v, previous)) { | |
| stream.propogate(v); | |
| previous = v; | |
| } | |
| }); | |
| }, | |
| throttle: function(ms, opts) { | |
| var handler = _.throttle(function(stream, v) {stream.propogate(v);}, ms, opts); | |
| return this._specialise(handler); | |
| }, | |
| debounce: function(ms, immediate) { | |
| var handler = _.debounce(function(stream, v) {stream.propogate(v);}, ms, immediate); | |
| return this._specialise(handler); | |
| }, | |
| _specialise: function(handler) { | |
| var self = this; | |
| var stream = new Stream({autoDispose: true, valueOnSubscribe: this.opts.valueOnSubscribe}); | |
| var wrapper = function(v) {handler(stream, v);}; | |
| stream.dispose = function() {self.unsubscribe(wrapper);}; | |
| this.subscribe(wrapper); | |
| return stream; | |
| }, | |
| _specialiseOnStream: function(s, handlerA, handlerB) { | |
| var self = this; | |
| var stream = new Stream({autoDispose: true, valueOnSubscribe: this.opts.valueOnSubscribe}); | |
| var wrapperA = function(v) {handlerA(stream, v);} | |
| var wrapperB = function(v) {handlerB(stream, v);} | |
| stream.dispose = function() { | |
| self.unsubscribe(wrapperA); | |
| s.unsubscribe(wrapperB); | |
| } | |
| this.subscribe(wrapperA); | |
| s.subscribe(wrapperB); | |
| return stream; | |
| } | |
| }; | |
| // Example of integrating with jQuery | |
| $.fn.stream = function(event) { | |
| var $el = this; | |
| var stream = new Stream(); | |
| var handler = function(e) {stream.propogate(e)}; | |
| stream.dispose = function() {$el.off(event, handler);}; | |
| $el.on(event, handler); | |
| return stream; | |
| }; | |
| // Generate a stream and subscribe | |
| var stream = $('.button-1').stream('click').map(function(e) {return e.target}); | |
| stream.subscribe(function(el) {console.log('Got element', el);}); | |
| var filtered = stream.filter(function(v) {return (new Date()).getSeconds() % 2 == 0;}); | |
| filtered.subscribe(function(v) { | |
| console.log("lucky!"); | |
| }); | |
| var streamA = $('.button-1').stream('click'); | |
| var streamB = $('.button-2').stream('click'); | |
| streamA.merge(streamB).subscribe(function(e) { | |
| console.log('Merged event', e); | |
| }) | |
| streamA.combineLatest(streamB).subscribe(function(v) { | |
| console.log("Combined", v); | |
| }); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment