Created
December 31, 2010 13:07
-
-
Save gautamrege/760991 to your computer and use it in GitHub Desktop.
Redis Pubsub Client with event persistence
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'redis' | |
require 'multi_json' | |
class PubSubRedis < Redis | |
def initialize(options = {}) | |
@timestamp = options[:timestamp].to_i || 0 # 0 means -- no backlog needed | |
super | |
end | |
# Add each event to a Sorted Set with the timestamp as the score | |
def publish(channel, message) | |
timestamp = Time.now.to_i | |
zadd(channel, timestamp, MultiJson.encode([channel, message])) | |
super(channel, MultiJson.encode(message)) | |
end | |
# returns the backlog of pending messages [ event, payload ] pairs | |
# We do a union of sorted sets because we need to support wild-card channels. | |
def backlog(channels, &block) | |
return if @timestamp == 0 | |
# Collect the entire set of events with wild-card support. | |
events = channels.collect {|e| keys(e)}.flatten | |
return if not events or events.empty? # no events to process | |
destination = "pending-#{Time.now.to_i}" | |
zunionstore(destination, events) | |
# We want events only after the timestamp so add the (. This ensures that | |
# an event with this timestamp will not be sent. | |
# TODO: We may have a condition where, multiple events for the same timestamp | |
# may be recorded but will be missed out because of the (. | |
messages = zrangebyscore(destination, "(#{@timestamp.to_s}", "+inf") | |
messages.each do |message| | |
event, payload = MultiJson.decode(message) | |
block.call(event, payload) | |
end | |
# cleanup | |
del(destination) | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment