Created
January 5, 2017 11:36
-
-
Save antonioaguilar/0b89bb8dbf2c7f9b7254a2a72541a99b to your computer and use it in GitHub Desktop.
Simple PubSub using KefirJS (version 1)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
( function(root, factory) { | |
root.pubsub = factory(root); | |
}(this, function() { | |
var publishEmitter; | |
var directory; | |
var eventStream; | |
var SubscriptionDefinition = function(stream, callback) { | |
this.stream = stream; | |
this.callback = callback; | |
this.onUnsubscribe = null; | |
this.subscription = stream.observe({ value: function(ev) {callback(ev.data, ev);} }); | |
stream.subscriberCount = stream.subscriberCount ? stream.subscriberCount + 1 : 1; | |
}; | |
SubscriptionDefinition.prototype.unsubscribe = function() { | |
if(typeof this.onUnsubscribe === 'function') { | |
this.onUnsubscribe(); | |
} | |
this.subscription.unsubscribe(); | |
if(!--this.stream.subscriberCount) { | |
delete directory[this.stream._source._channelName].topics[this.stream._binding]; | |
} | |
}; | |
function publish(event) { | |
event.timestamp = new Date().toISOString(); | |
publishEmitter && publishEmitter.emit(event); | |
} | |
function topicRegex(binding) { | |
var prevSegment; | |
pattern = '^' + binding.split('.').map(function mapTopicBinding(segment) { | |
var res = ''; | |
if(!!prevSegment) { | |
res = prevSegment !== '#' ? '\\.\\b' : '\\b'; | |
} | |
if(segment === '#') { | |
res += '[\\s\\S]*'; | |
} else if(segment === '*') { | |
res += '[^.]+'; | |
} else { | |
res += segment; | |
} | |
prevSegment = segment; | |
return res; | |
}).join('') + '$'; | |
return new RegExp(pattern); | |
} | |
function topicComparator(binding) { | |
if(binding.indexOf('#') === -1 && binding.indexOf('*') === -1) { | |
return (function(ev) { return ev.topic === binding; }); | |
} | |
else { | |
var rgx = topicRegex(binding); | |
return (function(ev) { return rgx.test(ev.topic); }); | |
} | |
} | |
function getChannel(name) { | |
if(!directory[name]) { | |
directory[name] = { | |
stream: eventStream.filter(function(ev) { return ev.channel === name; }), | |
topics: {} | |
}; | |
directory[name].stream._channelName = name; | |
} | |
return directory[name]; | |
} | |
function getTopicStream(channelName, binding) { | |
var channel = getChannel(channelName); | |
var cmp = topicComparator(binding); | |
var stream = channel.topics[binding] || (channel.topics[binding] = channel.stream.filter(function(ev) { return cmp(ev); })); | |
stream._binding = binding; | |
return stream; | |
} | |
function subscribe(def) { | |
return new SubscriptionDefinition(getTopicStream(def.channel, def.topic), def.callback); | |
} | |
function addWireTap(callback) { | |
var subscription = eventStream.observe({ value: function(ev) { callback(ev.data, ev); } }); | |
return function() { subscription.unsubscribe() }; | |
} | |
function reset() { | |
directory = {}; | |
publishEmitter && publishEmitter.end(); | |
publishEmitter = null; | |
eventStream = Kefir.stream(function(emitter) { | |
publishEmitter = emitter; | |
return function() { publishEmitter = null; }; | |
}); | |
} | |
function when(defs, onSuccess, onError, options) { | |
var streams = []; | |
var _options = options || {}; | |
defs.forEach(function(def) { | |
streams.push(getTopicStream(def.channel, def.topic)); | |
}); | |
var aligned = Kefir.zip(streams); | |
var limited = _options.once ? aligned.take(1) : aligned; | |
limited.observe({ value: function(data) { onSuccess.apply(this, data); }, error: onError }); | |
} | |
reset(); | |
return { | |
publish: publish, | |
subscribe: subscribe, | |
addWireTap: addWireTap, | |
reset: reset, | |
when: when | |
}; | |
}) ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Thanks for sharing this...Could you add a simple example?