-
-
Save lgs/1052510 to your computer and use it in GitHub Desktop.
Simple Pub/Sub system using MongoDB, capped collections and tailable cursors in ruby
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
require 'mongo' | |
module MongoPubSub | |
QUEUES_COLLECTION = 'queues' | |
class EndSubscriptionException < Exception; end | |
class Publisher | |
def initialize(queue_name, mongo_connection) | |
# Initialize queue collection as a capped collection | |
if not mongo_connection[QUEUES_COLLECTION].collection_names.include?(queue_name) | |
mongo_connection[QUEUES_COLLECTION].create_collection(queue_name, { :capped => true, :max => 20 }) | |
end | |
@publish_collection = mongo_connection[QUEUES_COLLECTION][queue_name] | |
end | |
def push(message) | |
document = { '_id' => (Time.now.to_f * 1000000 ).to_i, 'message' => message } | |
@publish_collection.save(document) | |
end | |
end | |
class Subscriber | |
def initialize(queue_name, mongo_connection) | |
@subscribed_collection = mongo_connection[QUEUES_COLLECTION][queue_name] | |
# Find latest event in collection, we will ignore all earlier events | |
@earliest = @subscribed_collection.find_one({}, :sort => [[ '$natural', -1 ]])['_id'] | |
end | |
def each | |
tail = Mongo::Cursor.new(@subscribed_collection, :tailable => true, :order => [['$natural', 1]]) | |
while true do | |
doc = tail.next_document | |
if doc != nil && doc["_id"].to_i > @earliest | |
begin | |
yield doc['message'] | |
rescue EndSubscriptionException | |
break | |
end | |
elsif doc != nil | |
# This event is too old, ignore it | |
else | |
# No event to process. Wait a bit. | |
sleep(1) | |
end | |
end | |
end | |
end | |
end | |
if __FILE__ == $0 | |
# Simple Pub/Sub test | |
# Create a subscriber and published on the test queue | |
# send a message that's ignored and send 4 valid message | |
# Ends with a special meaning message that stops properly | |
# the subcriber | |
m = Mongo::Connection.new('127.0.0.1', 27017) | |
puts "Starting Publisher" | |
p = MongoPubSub::Publisher.new('test', m) | |
# this message should not display | |
p.push("This message is too old, ignore me.") | |
puts "Starting Subscriber" | |
n = Mongo::Connection.new('127.0.0.1', 27017) | |
s = MongoPubSub::Subscriber.new('test', n) | |
subcriber = Thread.new do | |
puts "Now waiting for messages" | |
s.each do |message| | |
puts "Subscriber: received \"#{message}\"" | |
# handle exit message correctly | |
if message == "Exit" | |
raise MongoPubSub::EndSubscriptionException.new | |
end | |
end | |
puts "Ending Subscriber !" | |
end | |
['first event', 'second event', 'third evend','fourth event','Exit'].each do |mes| | |
puts "Publisher #{mes}" | |
p.push(mes) | |
end | |
subcriber.join | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment