Skip to content

Instantly share code, notes, and snippets.

@arkiver
Last active December 19, 2015 17:58
Show Gist options
  • Save arkiver/c814c017454865a6a9a1 to your computer and use it in GitHub Desktop.
Save arkiver/c814c017454865a6a9a1 to your computer and use it in GitHub Desktop.
Simple ruby pub sub system
# server.rb
require 'socket'
require 'debugger'
# An event can be anything, rss feed, tweet, etc
class Event
attr_reader :event_attributes
def initialize(event = [])
@event_attributes = event
end
end
# This class is responsible for maintain a list of subscribers
# to events and dispatching them those events that match their
# subscription predicates
class Dispatcher
@@subscribers_hashmap = {}
@@events_list = []
attr_accessor :events_list
def self.queue(event)
@@events_list << event
end
# Register subscriber. note its subscription predicates
def self.register_subscriber(subscriber)
if @@subscribers_hashmap[subscriber.predicates].nil?
@@subscribers_hashmap.merge!(subscriber.predicates => []) # create a new bucket
end
@@subscribers_hashmap[subscriber.predicates] << subscriber # push subscriber into bucket
end
# Subscriber no longer wants to receive subscribed feeds
def self.unregister_subscriber(subscriber)
@@subscribers_hashmap[subscriber.predicates] -= ([] << subscriber)
end
# Send events to subscribers according to their subscriptions
def route_event_to_subscribers
if !@@events_list.empty?
event = @@events_list.shift
@@subscribers_hashmap[event.event_attributes].map{|sub| sub.post(event) }
end
end
end
class Subscriber
attr_reader :predicates
def initialize(predicates = {
:country_code => '',
:web_page_name => '',
:referrer_name => ''
})
@predicates = [
predicates[:country_code].to_s,
predicates[:web_page_name].to_s,
predicates[:referrer_name].to_s
]
end
def register_self
Dispatcher.register_subscriber(self)
end
def post(event)
puts event.event_attributes
end
end
10000.times do
a = ['IN', 'CN', 'BR', 'AF', '']
b = ['home', 'about', 'contact', 'careers', '']
c = ['nytimes.org', 'pubsub.com', 'foo.org', '']
Subscriber.new(:country_code => a.sample, :web_page_name => b.sample, :referrer_name => c.sample).register_self
end
dispatcher = Dispatcher.new
server = TCPServer.open(2000)
loop{
# send feeds to subscribers whenever there are events in the queue.
dispatcher.route_event_to_subscribers # TODO: make this async
Thread.start(server.accept) do |client|
# new feed received.
incoming_data = Marshal.load(client.read)
# queue whatever data the client has sent.
Dispatcher.queue(Event.new(incoming_data))
puts Dispatcher.eve.length
end
}
# client.rb
require 'socket'
require 'debugger'
hostname = 'localhost'
port = 2000
loop{
# generate a feed
feed = [
['IN', 'CN', 'BR', 'AF', ''].sample,
['home', 'about', 'contact', 'careers', ''].sample,
['nytimes.org', 'pubsub.com', 'foo.org', ''].sample]
# open connection to server
soc = TCPSocket.new(hostname, port)
# send the feed to server
soc.write(Marshal.dump(feed))
soc.close
}
# test
# def post(event)
# p '<------------->'
# puts event.event_attributes
# p '========'
# p self.predicates
# p '<------------->'
# end
# "<------------->"
# AF
# about
# foo.org
# "========"
# ["AF", "about", "foo.org"]
# "<------------->"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment