Last active
December 19, 2015 17:58
-
-
Save arkiver/c814c017454865a6a9a1 to your computer and use it in GitHub Desktop.
Simple ruby pub sub system
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# server.rb | |
require 'socket' | |
require 'debugger' | |
# An event can be anything, rss feed, tweet, etc | |
class Event | |
attr_reader :event_attributes | |
def initialize(event = []) | |
@event_attributes = event | |
end | |
end | |
# This class is responsible for maintain a list of subscribers | |
# to events and dispatching them those events that match their | |
# subscription predicates | |
class Dispatcher | |
@@subscribers_hashmap = {} | |
@@events_list = [] | |
attr_accessor :events_list | |
def self.queue(event) | |
@@events_list << event | |
end | |
# Register subscriber. note its subscription predicates | |
def self.register_subscriber(subscriber) | |
if @@subscribers_hashmap[subscriber.predicates].nil? | |
@@subscribers_hashmap.merge!(subscriber.predicates => []) # create a new bucket | |
end | |
@@subscribers_hashmap[subscriber.predicates] << subscriber # push subscriber into bucket | |
end | |
# Subscriber no longer wants to receive subscribed feeds | |
def self.unregister_subscriber(subscriber) | |
@@subscribers_hashmap[subscriber.predicates] -= ([] << subscriber) | |
end | |
# Send events to subscribers according to their subscriptions | |
def route_event_to_subscribers | |
if !@@events_list.empty? | |
event = @@events_list.shift | |
@@subscribers_hashmap[event.event_attributes].map{|sub| sub.post(event) } | |
end | |
end | |
end | |
class Subscriber | |
attr_reader :predicates | |
def initialize(predicates = { | |
:country_code => '', | |
:web_page_name => '', | |
:referrer_name => '' | |
}) | |
@predicates = [ | |
predicates[:country_code].to_s, | |
predicates[:web_page_name].to_s, | |
predicates[:referrer_name].to_s | |
] | |
end | |
def register_self | |
Dispatcher.register_subscriber(self) | |
end | |
def post(event) | |
puts event.event_attributes | |
end | |
end | |
10000.times do | |
a = ['IN', 'CN', 'BR', 'AF', ''] | |
b = ['home', 'about', 'contact', 'careers', ''] | |
c = ['nytimes.org', 'pubsub.com', 'foo.org', ''] | |
Subscriber.new(:country_code => a.sample, :web_page_name => b.sample, :referrer_name => c.sample).register_self | |
end | |
dispatcher = Dispatcher.new | |
server = TCPServer.open(2000) | |
loop{ | |
# send feeds to subscribers whenever there are events in the queue. | |
dispatcher.route_event_to_subscribers # TODO: make this async | |
Thread.start(server.accept) do |client| | |
# new feed received. | |
incoming_data = Marshal.load(client.read) | |
# queue whatever data the client has sent. | |
Dispatcher.queue(Event.new(incoming_data)) | |
puts Dispatcher.eve.length | |
end | |
} | |
# client.rb | |
require 'socket' | |
require 'debugger' | |
hostname = 'localhost' | |
port = 2000 | |
loop{ | |
# generate a feed | |
feed = [ | |
['IN', 'CN', 'BR', 'AF', ''].sample, | |
['home', 'about', 'contact', 'careers', ''].sample, | |
['nytimes.org', 'pubsub.com', 'foo.org', ''].sample] | |
# open connection to server | |
soc = TCPSocket.new(hostname, port) | |
# send the feed to server | |
soc.write(Marshal.dump(feed)) | |
soc.close | |
} | |
# test | |
# def post(event) | |
# p '<------------->' | |
# puts event.event_attributes | |
# p '========' | |
# p self.predicates | |
# p '<------------->' | |
# end | |
# "<------------->" | |
# AF | |
# about | |
# foo.org | |
# "========" | |
# ["AF", "about", "foo.org"] | |
# "<------------->" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment