Created
November 9, 2010 16:17
-
-
Save octplane/669309 to your computer and use it in GitHub Desktop.
Simple Pub/Sub system using MongoDB, capped collections and tailable cursors in ruby
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'mongo' | |
module MongoPubSub | |
QUEUES_COLLECTION = 'queues' | |
class EndSubscriptionException < Exception; end | |
class Publisher | |
def initialize(queue_name, mongo_connection) | |
# Initialize queue collection as a capped collection | |
if not mongo_connection[QUEUES_COLLECTION].collection_names.include?(queue_name) | |
mongo_connection[QUEUES_COLLECTION].create_collection(queue_name, { :capped => true, :max => 20 }) | |
end | |
@publish_collection = mongo_connection[QUEUES_COLLECTION][queue_name] | |
end | |
def push(message) | |
document = { '_id' => (Time.now.to_f * 1000000 ).to_i, 'message' => message } | |
@publish_collection.save(document) | |
end | |
end | |
class Subscriber | |
def initialize(queue_name, mongo_connection) | |
@subscribed_collection = mongo_connection[QUEUES_COLLECTION][queue_name] | |
# Find latest event in collection, we will ignore all earlier events | |
@earliest = @subscribed_collection.find_one({}, :sort => [[ '$natural', -1 ]])['_id'] | |
end | |
def each | |
tail = Mongo::Cursor.new(@subscribed_collection, :tailable => true, :order => [['$natural', 1]]) | |
while true do | |
doc = tail.next_document | |
if doc != nil && doc["_id"].to_i > @earliest | |
begin | |
yield doc['message'] | |
rescue EndSubscriptionException | |
break | |
end | |
elsif doc != nil | |
# This event is too old, ignore it | |
else | |
# No event to process. Wait a bit. | |
sleep(1) | |
end | |
end | |
end | |
end | |
end | |
if __FILE__ == $0 | |
# Simple Pub/Sub test | |
# Create a subscriber and published on the test queue | |
# send a message that's ignored and send 4 valid message | |
# Ends with a special meaning message that stops properly | |
# the subcriber | |
m = Mongo::Connection.new('127.0.0.1', 27017) | |
puts "Starting Publisher" | |
p = MongoPubSub::Publisher.new('test', m) | |
# this message should not display | |
p.push("This message is too old, ignore me.") | |
puts "Starting Subscriber" | |
n = Mongo::Connection.new('127.0.0.1', 27017) | |
s = MongoPubSub::Subscriber.new('test', n) | |
subcriber = Thread.new do | |
puts "Now waiting for messages" | |
s.each do |message| | |
puts "Subscriber: received \"#{message}\"" | |
# handle exit message correctly | |
if message == "Exit" | |
raise MongoPubSub::EndSubscriptionException.new | |
end | |
end | |
puts "Ending Subscriber !" | |
end | |
['first event', 'second event', 'third evend','fourth event','Exit'].each do |mes| | |
puts "Publisher #{mes}" | |
p.push(mes) | |
end | |
subcriber.join | |
end |
How does this recover from pushing too many things into the queue?
Why don't you make this a gem?
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Rather than sleeping for a second after receiving a nil response, you can use the await_data option:
When there are no additional documents in the cursor, mongod will wait for a few seconds for new documents before sending a response.