-
-
Save slyphon/401391 to your computer and use it in GitHub Desktop.
zookeeper barrier implementation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
require 'rubygems' | |
gem 'zookeeper' | |
require 'zookeeper' | |
REQUIRED_NUMBER_OF_PROCESSES = 2 | |
class EventHandler | |
import org.apache.zookeeper.Watcher | |
def watches | |
@registered_watches ||= {} | |
end | |
def register(path, &block) | |
watches[path] = block | |
end | |
def process(event) | |
path = nil | |
path = event.get_path | |
msg = "#{event.class} received" | |
msg += " path = #{path}" if path | |
msg += " state = #{Zk::KEEPER_STATES[event.get_state]}" | |
msg += " type = #{Zk::EVENT_TYPES[event.get_type]}" | |
puts msg | |
watches[path].call(event) if watches[path] | |
end | |
end | |
zkWatcher = EventHandler.new | |
zk2Watcher = EventHandler.new | |
zk = ZooKeeper.new(:host => "localhost:2181", :watcher => zkWatcher) | |
zk2 = ZooKeeper.new(:host => "localhost:2181", :watcher => zk2Watcher) | |
begin | |
#this is demo code | |
zk.delete("/barrier/ready") | |
rescue | |
end | |
zkWatcher.register("/barrier/ready") do |event| | |
puts "oh yeah zk1 - we got quorum" | |
end | |
zk.exists(:path => "/barrier/ready", :watch => true) | |
puts "zk1 joining process" | |
zk.create(:path => "/barrier/zk1", :data => "", :mode => :ephemeral) | |
if zk.children(:path => "/barrier").size >= REQUIRED_NUMBER_OF_PROCESSES | |
zk.create(:path => "/barrier/ready", :data => "", :mode => :persistent) | |
end | |
zk2Watcher.register("/barrier/ready") do |event| | |
puts "oh yeah zk2 - we got quorum" | |
puts "removing ready node" | |
zk2.delete(:path => "/barrier/ready") | |
end | |
zk2.exists(:path => "/barrier/ready", :watch => true) | |
puts "zk2 joining process" | |
zk2.create(:path => "/barrier/zk2", :data => "", :mode => :ephemeral) | |
if zk2.children(:path => "/barrier").size >= REQUIRED_NUMBER_OF_PROCESSES | |
zk2.create(:path => "/barrier/ready", :data => "", :mode => :persistent) | |
end | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment