Last active
December 19, 2015 17:49
-
-
Save arkiver/b91472bd2f5161492922 to your computer and use it in GitHub Desktop.
A simple pub sub system in ruby. Using event machine
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# server.rb | |
require 'eventmachine' | |
# An event can be anything, rss feed, tweet, etc | |
class Event | |
attr_reader :event_attributes | |
def initialize(event = []) | |
@event_attributes = event | |
end | |
end | |
# This class is responsible for maintain a list of subscribers | |
# to events and dispatching them those events that match their | |
# subscription predicates | |
class Dispatcher < EM::Connection | |
@@subscribers_hashmap = {} | |
@@events_list = [] | |
attr_accessor :events_list | |
def receive_data(data) | |
queue(Event.new(Marshal.load(data))) | |
close_connection | |
end | |
# Register subscriber. note its subscription predicates | |
def self.register_subscriber(subscriber) | |
if @@subscribers_hashmap[subscriber.predicates].nil? | |
@@subscribers_hashmap.merge!(subscriber.predicates => []) # create a new bucket | |
end | |
@@subscribers_hashmap[subscriber.predicates] << subscriber # push subscriber into bucket | |
end | |
# Subscriber no longer wants to receive subscribed feeds | |
def self.unregister_subscriber(subscriber) | |
@@subscribers_hashmap[subscriber.predicates] -= ([] << subscriber) | |
end | |
# Send events to subscribers according to their subscriptions | |
def self.route_event_to_subscribers | |
EM.add_periodic_timer(1) do | |
next if @@events_list.empty? | |
@@events_list.length.times do | |
event = @@events_list.shift | |
@@subscribers_hashmap[event.event_attributes].map{|sub| sub.post(event) } | |
end | |
end | |
end | |
private | |
# Queue up incoming events | |
def queue(event) | |
@@events_list << event | |
end | |
end | |
class Subscriber | |
attr_reader :predicates | |
def initialize(predicates = { | |
:country_code => '', | |
:web_page_name => '', | |
:referrer_name => '' | |
}) | |
@predicates = [ | |
predicates[:country_code].to_s, | |
predicates[:web_page_name].to_s, | |
predicates[:referrer_name].to_s | |
] | |
end | |
def register_self | |
Dispatcher.register_subscriber(self) | |
end | |
def post(event) | |
puts event.event_attributes | |
end | |
end | |
a = ['IN', 'CN', 'BR', 'AF', ''] | |
b = ['home', 'about', 'contact', 'careers', ''] | |
c = ['nytimes.org', 'pubsub.com', 'foo.org', ''] | |
10000.times do | |
Subscriber.new( | |
:country_code => a.sample, | |
:web_page_name => b.sample, | |
:referrer_name => c.sample).register_self | |
end | |
EM.run do | |
EM.defer do | |
Dispatcher.route_event_to_subscribers | |
end | |
EM.start_server("localhost", 8081, Dispatcher) | |
end | |
# client.rb | |
require 'socket' | |
loop{ | |
feed = [ | |
['IN', 'CN', 'BR', 'AF', ''].sample, | |
['home', 'about', 'contact', 'careers', ''].sample, | |
['nytimes.org', 'pubsub.com', 'foo.org', ''].sample] | |
soc = TCPSocket.new('localhost', 8081) | |
soc.write(Marshal.dump(feed)) | |
soc.close | |
} | |
# test | |
# def post(event) | |
# p '<------------->' | |
# puts event.event_attributes | |
# p '========' | |
# p self.predicates | |
# p '<------------->' | |
# end | |
# "<------------->" | |
# AF | |
# about | |
# foo.org | |
# "========" | |
# ["AF", "about", "foo.org"] | |
# "<------------->" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment