Skip to content

Instantly share code, notes, and snippets.

@arkiver
Last active December 19, 2015 17:49
Show Gist options
  • Save arkiver/b91472bd2f5161492922 to your computer and use it in GitHub Desktop.
Save arkiver/b91472bd2f5161492922 to your computer and use it in GitHub Desktop.
A simple pub sub system in ruby. Using event machine
# server.rb
require 'eventmachine'
# An event can be anything, rss feed, tweet, etc
class Event
attr_reader :event_attributes
def initialize(event = [])
@event_attributes = event
end
end
# This class is responsible for maintain a list of subscribers
# to events and dispatching them those events that match their
# subscription predicates
class Dispatcher < EM::Connection
@@subscribers_hashmap = {}
@@events_list = []
attr_accessor :events_list
def receive_data(data)
queue(Event.new(Marshal.load(data)))
close_connection
end
# Register subscriber. note its subscription predicates
def self.register_subscriber(subscriber)
if @@subscribers_hashmap[subscriber.predicates].nil?
@@subscribers_hashmap.merge!(subscriber.predicates => []) # create a new bucket
end
@@subscribers_hashmap[subscriber.predicates] << subscriber # push subscriber into bucket
end
# Subscriber no longer wants to receive subscribed feeds
def self.unregister_subscriber(subscriber)
@@subscribers_hashmap[subscriber.predicates] -= ([] << subscriber)
end
# Send events to subscribers according to their subscriptions
def self.route_event_to_subscribers
EM.add_periodic_timer(1) do
next if @@events_list.empty?
@@events_list.length.times do
event = @@events_list.shift
@@subscribers_hashmap[event.event_attributes].map{|sub| sub.post(event) }
end
end
end
private
# Queue up incoming events
def queue(event)
@@events_list << event
end
end
class Subscriber
attr_reader :predicates
def initialize(predicates = {
:country_code => '',
:web_page_name => '',
:referrer_name => ''
})
@predicates = [
predicates[:country_code].to_s,
predicates[:web_page_name].to_s,
predicates[:referrer_name].to_s
]
end
def register_self
Dispatcher.register_subscriber(self)
end
def post(event)
puts event.event_attributes
end
end
a = ['IN', 'CN', 'BR', 'AF', '']
b = ['home', 'about', 'contact', 'careers', '']
c = ['nytimes.org', 'pubsub.com', 'foo.org', '']
10000.times do
Subscriber.new(
:country_code => a.sample,
:web_page_name => b.sample,
:referrer_name => c.sample).register_self
end
EM.run do
EM.defer do
Dispatcher.route_event_to_subscribers
end
EM.start_server("localhost", 8081, Dispatcher)
end
# client.rb
require 'socket'
loop{
feed = [
['IN', 'CN', 'BR', 'AF', ''].sample,
['home', 'about', 'contact', 'careers', ''].sample,
['nytimes.org', 'pubsub.com', 'foo.org', ''].sample]
soc = TCPSocket.new('localhost', 8081)
soc.write(Marshal.dump(feed))
soc.close
}
# test
# def post(event)
# p '<------------->'
# puts event.event_attributes
# p '========'
# p self.predicates
# p '<------------->'
# end
# "<------------->"
# AF
# about
# foo.org
# "========"
# ["AF", "about", "foo.org"]
# "<------------->"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment