Skip to content

Instantly share code, notes, and snippets.

@ewoodh2o
Created March 30, 2012 14:50
Show Gist options
  • Save ewoodh2o/2252058 to your computer and use it in GitHub Desktop.
Save ewoodh2o/2252058 to your computer and use it in GitHub Desktop.
ZMQ Pub/Sub Exploration

Design Goals

  1. Scribe has a single listener, with multiple clients sending it data (multiple Tabs unicorns, etc)
  2. If Scribe goes down temporarily, messages should queue on the client and be delivered when it comes back.
  3. If Scribe is down for a long time, the client should throw away messages rather than exhausting its RAM.
  4. If Scribe is down and the client terminates, it shouldn't wait around forever trying to deliver messages.

In the code below, pub.rb goes in the Scribe client gem and gets installed in Tabs, etc, with a similar thing in Deejay. The sub.rb piece is part of Scribe.

require 'rubygems'
require 'zmq'
context = ZMQ::Context.new
push = context.socket(ZMQ::PUB)
push.setsockopt(ZMQ::HWM, 5)
push.setsockopt(ZMQ::LINGER, 5000)
push.connect("tcp://127.0.0.1:5555")
# Send block of messages, or default to one at a time
count = (ARGV[0] || 1).to_i
# Broadcast 20 updates, sleeping between
20.times do |i|
count.times do |j|
puts "> Update #{i}.#{j}"
push.send("Update #{i}.#{j}")
end
sleep(1)
end
puts "> END"
push.send("END")
push.close
context.close
require 'rubygems'
require 'zmq'
context = ZMQ::Context.new
# Start listening
pull = context.socket(ZMQ::SUB)
pull.setsockopt(ZMQ::IDENTITY, 'Scribe')
pull.setsockopt(ZMQ::SUBSCRIBE, '')
pull.bind("tcp://*:5555")
# Get updates, exit on ^C
begin
while line = pull.recv
puts "< #{line}"
end
rescue Exception => e
end
pull.close
context.close
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment