Created
July 19, 2010 00:19
-
-
Save arya/480851 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MemcacheLock | |
class LockTimeout < StandardError; end | |
LOCK_VALUE = "#{Process.pid}#{Time.now.usec}".to_i # not really important right now | |
LOCK_TTL = 40 | |
KEEP_DURATIONS = 100 | |
EXPIRE_DURATION_ESTIMATE = 60 | |
MAX_RETRIES = 20 | |
MAX_RETRY_DELAY = 0.7 | |
def initialize(memcache, default_duration = nil) | |
@memcache = memcache | |
@durations = Array.new(KEEP_DURATIONS) | |
@durations_index = 0 | |
@duration_last_reset_at = Time.now.to_i | |
@default_duration = default_duration || 0.1 | |
@duration = @default_duration | |
end | |
def lock(*records, &block) | |
records = records.uniq.sort_by { |r| "#{r.class}/#{r[r.class.primary_key]}" } | |
locked = [] | |
begin | |
records.each do |record| | |
if self.acquire_lock_on(record) | |
locked << record | |
else | |
raise LockTimeout, "failed to retrieve memcached lock on: #{self.lock_key(record)}" | |
end | |
end | |
result = nil | |
duration = Benchmark.realtime { result = block.call } | |
self.record_duration(duration) | |
result | |
ensure | |
locked.each do |record| | |
self.release_lock_on(record) | |
end | |
end | |
end | |
protected | |
def duration | |
if @duration.nil? || (@duration_last_reset_at + EXPIRE_DURATION_ESTIMATE) < Time.now.to_i | |
@duration_last_reset_at = Time.now.to_i | |
@duration = self.calculate_duration | |
else | |
@duration | |
end | |
end | |
def acquire_lock_on(record) | |
key = self.lock_key(record) | |
counter_key = self.counter_key(record) | |
clients_waiting_for_locks = nil | |
begin | |
MAX_RETRIES.times do |try_number| | |
begin | |
@memcache.add(key, LOCK_VALUE, LOCK_TTL) | |
return true | |
rescue Memcached::NotStored | |
clients_waiting_for_locks ||= self.safe_incr(counter_key) | |
time = self.sleep_time_for(try_number, clients_waiting_for_locks) | |
sleep(time) | |
end | |
end | |
false | |
ensure | |
self.safe_decr(counter_key) if clients_waiting_for_locks | |
end | |
end | |
def release_lock_on(record) | |
begin | |
@memcache.delete(self.lock_key(record)) | |
rescue Memcached::NotFound | |
end | |
end | |
def calculate_duration | |
if @durations.index(nil).nil? | |
@durations.inject { |a, b| a + b} / @durations.size | |
else | |
@default_duration | |
end | |
end | |
def record_duration(duration) | |
@durations[@durations_index] = duration | |
@durations_index += 1 | |
@durations_index = 0 if @durations_index >= @durations.size | |
end | |
def sleep_time_for(try_number, clients_waiting_for_locks) | |
# this is optimal for spaced out spread | |
if try_number == 0 | |
(self.duration * (clients_waiting_for_locks - 0.5)) | |
else | |
self.duration / 2 | |
end | |
# this one tends to be better than the one below where the clients are more spread out | |
# [self.duration * (clients_waiting_for_locks + 0.5), MAX_RETRY_DELAY].min | |
# this one tends to be better when clients enter at relatively the same time | |
# [self.duration * (clients_waiting_for_locks / (try_number + 1.0)), 0.1].max | |
end | |
def lock_key(record) | |
"#{record.class}/#{record[record.class.primary_key]}/pessimistic_lock" | |
end | |
def counter_key(record) | |
"#{record.class}/#{record[record.class.primary_key]}/pessimistic_lock/counter" | |
end | |
def safe_incr(key) | |
begin | |
@memcache.incr(key) | |
rescue Memcached::NotFound | |
begin | |
@memcache.add(key, "1", LOCK_TTL, false) | |
1 | |
rescue Memcached::NotStored | |
@memcache.incr(key) | |
end | |
end | |
end | |
def safe_decr(key) | |
begin | |
@memcache.decr(key) | |
rescue Memcached::NotFound | |
end | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment