Skip to content

Instantly share code, notes, and snippets.

@lukesutton
Last active December 28, 2015 22:59
Show Gist options
  • Select an option

  • Save lukesutton/7575195 to your computer and use it in GitHub Desktop.

Select an option

Save lukesutton/7575195 to your computer and use it in GitHub Desktop.
The beginning of a very simple streams library.
// http://jsfiddle.net/MrkuK/
// Test version of a stream
function Stream(opts) {
this._subscribers = [];
this.opts = _.extend({
autoDispose: false,
valueOnSubscribe: false
}, opts);
}
Stream.prototype = {
// Helper which propogates values to subscribers
propogate: function(v) {
if (this.opts.valueOnSubscribe) {this._value = v;}
_.each(this._subscribers, function(s) {s(v);});
},
subscribe: function(f) {
this._subscribers.push(f);
if (this.opts.valueOnSubscribe && !_.isEmpty(this._value)) {
f(this._value);
}
return this;
},
unsubscribe: function(f) {
this._subscribers = _.without(this._subscribers, f);
if (this.opts.autoDispose && _.isEmpty(this._subscribers)) {this.dispose();}
},
dispose: function() {
},
map: function(f) {
return this._specialise(function(stream, v) {
stream.propogate(f(v));
});
},
filter: function(f) {
return this._specialise(function(stream, v) {
if (f(v)) {stream.propogate(v);}
});
},
merge: function(s) {
var handler = function(stream, v) {stream.propogate(v)}
return this._specialiseOnStream(s, handler, handler);
},
// Similar to .combineLatest(), except that it only fires with each new _pair_.
// That is, it won't combine a new value with an old value, it'll instead wait
// for new values on both streams. This is an effective way to pause on two
// streams until both have updated.
zip: function(s) {
var valueA;
var valueB;
var handlerA = function(stream, v) {
valueA = v;
if (!_.isEmpty(valueB)) {
stream.propogate([valueA, valueB])
valueA = valueB = undefined;
}
};
var handlerB = function(stream, v) {
valueB = v;
if (!_.isEmpty(valueA)) {
stream.propogate([valueA, valueB])
valueA = valueB = undefined;
}
};
return this._specialiseOnStream(s, handlerA, handlerB);
},
// Pauses the propagation of values through a stream until a new value
// arrives on the argument stream.
sampleBy: function(s) {
var value;
var handlerA = function(stream, v) {value = v;};
var handlerB = function(stream, v) {if (!_.isEmpty(value) {stream.propogate(value)}};
return this._specialiseOnStream(s, handlerA, handlerB);
},
combineLatest: function(s) {
var latestA;
var latestB;
var handlerA = function(stream, v) {
latestA = v;
if (!_.isEmpty(latestB)) {
stream.propogate([latestA, latestB]);
}
};
var handlerB = function(stream, v) {
latestB = v;
if (!_.isEmpty(latestA)) {
stream.propogate([latestA, latestB]);
}
};
return this._specialiseOnStream(s, handlerA, handlerB);
},
skipDuplicates: function() {
var previous;
return this._specialise(function(stream, v) {
if (!_.isEqual(v, previous)) {
stream.propogate(v);
previous = v;
}
});
},
throttle: function(ms, opts) {
var handler = _.throttle(function(stream, v) {stream.propogate(v);}, ms, opts);
return this._specialise(handler);
},
debounce: function(ms, immediate) {
var handler = _.debounce(function(stream, v) {stream.propogate(v);}, ms, immediate);
return this._specialise(handler);
},
_specialise: function(handler) {
var self = this;
var stream = new Stream({autoDispose: true, valueOnSubscribe: this.opts.valueOnSubscribe});
var wrapper = function(v) {handler(stream, v);};
stream.dispose = function() {self.unsubscribe(wrapper);};
this.subscribe(wrapper);
return stream;
},
_specialiseOnStream: function(s, handlerA, handlerB) {
var self = this;
var stream = new Stream({autoDispose: true, valueOnSubscribe: this.opts.valueOnSubscribe});
var wrapperA = function(v) {handlerA(stream, v);}
var wrapperB = function(v) {handlerB(stream, v);}
stream.dispose = function() {
self.unsubscribe(wrapperA);
s.unsubscribe(wrapperB);
}
this.subscribe(wrapperA);
s.subscribe(wrapperB);
return stream;
}
};
// Example of integrating with jQuery
$.fn.stream = function(event) {
var $el = this;
var stream = new Stream();
var handler = function(e) {stream.propogate(e)};
stream.dispose = function() {$el.off(event, handler);};
$el.on(event, handler);
return stream;
};
// Generate a stream and subscribe
var stream = $('.button-1').stream('click').map(function(e) {return e.target});
stream.subscribe(function(el) {console.log('Got element', el);});
var filtered = stream.filter(function(v) {return (new Date()).getSeconds() % 2 == 0;});
filtered.subscribe(function(v) {
console.log("lucky!");
});
var streamA = $('.button-1').stream('click');
var streamB = $('.button-2').stream('click');
streamA.merge(streamB).subscribe(function(e) {
console.log('Merged event', e);
})
streamA.combineLatest(streamB).subscribe(function(v) {
console.log("Combined", v);
});
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment