-
-
Save jamesholcomb/13b61440ecb2bb95f4092d7eea5cae40 to your computer and use it in GitHub Desktop.
Simple PubSub using KefirJS (version 1)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
( function(root, factory) { | |
root.pubsub = factory(root); | |
}(this, function() { | |
var publishEmitter; | |
var directory; | |
var eventStream; | |
var SubscriptionDefinition = function(stream, callback) { | |
this.stream = stream; | |
this.callback = callback; | |
this.onUnsubscribe = null; | |
this.subscription = stream.observe({ value: function(ev) {callback(ev.data, ev);} }); | |
stream.subscriberCount = stream.subscriberCount ? stream.subscriberCount + 1 : 1; | |
}; | |
SubscriptionDefinition.prototype.unsubscribe = function() { | |
if(typeof this.onUnsubscribe === 'function') { | |
this.onUnsubscribe(); | |
} | |
this.subscription.unsubscribe(); | |
if(!--this.stream.subscriberCount) { | |
delete directory[this.stream._source._channelName].topics[this.stream._binding]; | |
} | |
}; | |
function publish(event) { | |
event.timestamp = new Date().toISOString(); | |
publishEmitter && publishEmitter.emit(event); | |
} | |
function topicRegex(binding) { | |
var prevSegment; | |
pattern = '^' + binding.split('.').map(function mapTopicBinding(segment) { | |
var res = ''; | |
if(!!prevSegment) { | |
res = prevSegment !== '#' ? '\\.\\b' : '\\b'; | |
} | |
if(segment === '#') { | |
res += '[\\s\\S]*'; | |
} else if(segment === '*') { | |
res += '[^.]+'; | |
} else { | |
res += segment; | |
} | |
prevSegment = segment; | |
return res; | |
}).join('') + '$'; | |
return new RegExp(pattern); | |
} | |
function topicComparator(binding) { | |
if(binding.indexOf('#') === -1 && binding.indexOf('*') === -1) { | |
return (function(ev) { return ev.topic === binding; }); | |
} | |
else { | |
var rgx = topicRegex(binding); | |
return (function(ev) { return rgx.test(ev.topic); }); | |
} | |
} | |
function getChannel(name) { | |
if(!directory[name]) { | |
directory[name] = { | |
stream: eventStream.filter(function(ev) { return ev.channel === name; }), | |
topics: {} | |
}; | |
directory[name].stream._channelName = name; | |
} | |
return directory[name]; | |
} | |
function getTopicStream(channelName, binding) { | |
var channel = getChannel(channelName); | |
var cmp = topicComparator(binding); | |
var stream = channel.topics[binding] || (channel.topics[binding] = channel.stream.filter(function(ev) { return cmp(ev); })); | |
stream._binding = binding; | |
return stream; | |
} | |
function subscribe(def) { | |
return new SubscriptionDefinition(getTopicStream(def.channel, def.topic), def.callback); | |
} | |
function addWireTap(callback) { | |
var subscription = eventStream.observe({ value: function(ev) { callback(ev.data, ev); } }); | |
return function() { subscription.unsubscribe() }; | |
} | |
function reset() { | |
directory = {}; | |
publishEmitter && publishEmitter.end(); | |
publishEmitter = null; | |
eventStream = Kefir.stream(function(emitter) { | |
publishEmitter = emitter; | |
return function() { publishEmitter = null; }; | |
}); | |
} | |
function when(defs, onSuccess, onError, options) { | |
var streams = []; | |
var _options = options || {}; | |
defs.forEach(function(def) { | |
streams.push(getTopicStream(def.channel, def.topic)); | |
}); | |
var aligned = Kefir.zip(streams); | |
var limited = _options.once ? aligned.take(1) : aligned; | |
limited.observe({ value: function(data) { onSuccess.apply(this, data); }, error: onError }); | |
} | |
reset(); | |
return { | |
publish: publish, | |
subscribe: subscribe, | |
addWireTap: addWireTap, | |
reset: reset, | |
when: when | |
}; | |
}) ); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment