|
require 'zk' |
|
|
|
module Zookeeper |
|
class DistributedHashtable |
|
def initialize(zk, path) |
|
@zk = zk |
|
@path = path |
|
@mutex = Mutex.new |
|
@callbacks = [] |
|
|
|
configure_watches |
|
end |
|
|
|
def on_change(&block) |
|
@mutex.synchronize do |
|
@callbacks << block |
|
end |
|
|
|
block.call |
|
end |
|
|
|
def fire |
|
callbacks = @mutex.synchronize do |
|
@callbacks.dup |
|
end |
|
|
|
callbacks.each do |cb| |
|
begin |
|
cb.call |
|
rescue => e |
|
# Report an exception somewhere |
|
end |
|
end |
|
end |
|
|
|
def [](key) |
|
@mutex.synchronize do |
|
if @hashtable |
|
return @hashtable[key] |
|
end |
|
end |
|
end |
|
|
|
def []=(key, value) |
|
result = @mutex.synchronize do |
|
update do |hashtable| |
|
hashtable[key] = value |
|
end |
|
end |
|
|
|
fire |
|
|
|
return result |
|
end |
|
|
|
def has_key?(key) |
|
@mutex.synchronize do |
|
if @hashtable |
|
@hashtable.has_key?(key) |
|
end |
|
end |
|
end |
|
|
|
def delete(key) |
|
result = @mutex.synchronize do |
|
update do |hashtable| |
|
hashtable.delete(key) |
|
end |
|
end |
|
|
|
fire |
|
|
|
return result |
|
end |
|
|
|
def merge(other) |
|
result = @mutex.synchronize do |
|
update do |hashtable| |
|
hashtable.merge(other) |
|
end |
|
end |
|
|
|
fire |
|
|
|
return result |
|
end |
|
|
|
def to_h |
|
@mutex.synchronize do |
|
if @hashtable |
|
@hashtable.dup |
|
else |
|
{} |
|
end |
|
end |
|
end |
|
|
|
def each(&block) |
|
to_h.each(&block) |
|
end |
|
|
|
def length |
|
@mutex.synchronize do |
|
if @hashtable |
|
@hashtable.length |
|
else |
|
0 |
|
end |
|
end |
|
end |
|
|
|
def empty? |
|
length == 0 |
|
end |
|
alias_method :blank?, :empty? |
|
|
|
def read |
|
@mutex.synchronize do |
|
begin |
|
current, _ = @zk.get(@path, :watch => true) |
|
@hashtable = Yajl::Parser.parse(current) |
|
rescue ZK::Exceptions::NoNode |
|
if @zk.exists?(@path, :watch => true) |
|
retry |
|
else |
|
@hashtable = Hash.new |
|
end |
|
end |
|
end |
|
|
|
fire |
|
end |
|
|
|
def update(&block) |
|
return update_exists(&block) |
|
rescue ZK::Exceptions::NoNode |
|
begin |
|
return update_initial(&block) |
|
rescue ZK::Exceptions::NodeExists |
|
return update_exists(&block) |
|
end |
|
end |
|
|
|
def update_exists(&block) |
|
begin |
|
current, stat = @zk.get(@path, :watch => true) |
|
hashtable = Yajl::Parser.parse(current) |
|
|
|
result = block.call(hashtable) |
|
|
|
@zk.set(@path, Yajl::Encoder.encode(hashtable), :version => stat.version) |
|
@hashtable = hashtable |
|
|
|
return result |
|
rescue ZK::Exceptions::BadVersion |
|
sleep 0.1 + rand |
|
retry |
|
end |
|
end |
|
|
|
def update_initial(&block) |
|
begin |
|
hashtable = Hash.new |
|
|
|
result = block.call(hashtable) |
|
|
|
@zk.create(@path, Yajl::Encoder.encode(hashtable)) |
|
@hashtable = hashtable |
|
|
|
return result |
|
rescue ZK::Exceptions::NoNode |
|
@zk.mkdir_p(File.dirname(@path)) |
|
retry |
|
end |
|
end |
|
|
|
def configure_watches |
|
@register ||= @zk.register(@path) do |
|
read |
|
end |
|
|
|
@on_connected ||= @zk.on_connected do |
|
read |
|
end |
|
|
|
begin |
|
read |
|
rescue ZK::Exceptions::OperationTimeOut |
|
# Ignore these, we'll get them next time |
|
|
|
# record something with your metrics provider |
|
rescue ::Zookeeper::Exceptions::ContinuationTimeoutError |
|
# Ignore these, we'll get them next time |
|
|
|
# record something with your metrics provider |
|
|
|
rescue ::Zookeeper::Exceptions::NotConnected |
|
# Ignore these, we'll get them next time |
|
|
|
# record something with your metrics provider |
|
end |
|
end |
|
end |
|
end |
|
end |