Skip to content

Instantly share code, notes, and snippets.

@arya
Created July 19, 2010 00:19
Show Gist options
  • Save arya/480851 to your computer and use it in GitHub Desktop.
Save arya/480851 to your computer and use it in GitHub Desktop.
class MemcacheLock
class LockTimeout < StandardError; end
LOCK_VALUE = "#{Process.pid}#{Time.now.usec}".to_i # not really important right now
LOCK_TTL = 40
KEEP_DURATIONS = 100
EXPIRE_DURATION_ESTIMATE = 60
MAX_RETRIES = 20
MAX_RETRY_DELAY = 0.7
def initialize(memcache, default_duration = nil)
@memcache = memcache
@durations = Array.new(KEEP_DURATIONS)
@durations_index = 0
@duration_last_reset_at = Time.now.to_i
@default_duration = default_duration || 0.1
@duration = @default_duration
end
def lock(*records, &block)
records = records.uniq.sort_by { |r| "#{r.class}/#{r[r.class.primary_key]}" }
locked = []
begin
records.each do |record|
if self.acquire_lock_on(record)
locked << record
else
raise LockTimeout, "failed to retrieve memcached lock on: #{self.lock_key(record)}"
end
end
result = nil
duration = Benchmark.realtime { result = block.call }
self.record_duration(duration)
result
ensure
locked.each do |record|
self.release_lock_on(record)
end
end
end
protected
def duration
if @duration.nil? || (@duration_last_reset_at + EXPIRE_DURATION_ESTIMATE) < Time.now.to_i
@duration_last_reset_at = Time.now.to_i
@duration = self.calculate_duration
else
@duration
end
end
def acquire_lock_on(record)
key = self.lock_key(record)
counter_key = self.counter_key(record)
clients_waiting_for_locks = nil
begin
MAX_RETRIES.times do |try_number|
begin
@memcache.add(key, LOCK_VALUE, LOCK_TTL)
return true
rescue Memcached::NotStored
clients_waiting_for_locks ||= self.safe_incr(counter_key)
time = self.sleep_time_for(try_number, clients_waiting_for_locks)
sleep(time)
end
end
false
ensure
self.safe_decr(counter_key) if clients_waiting_for_locks
end
end
def release_lock_on(record)
begin
@memcache.delete(self.lock_key(record))
rescue Memcached::NotFound
end
end
def calculate_duration
if @durations.index(nil).nil?
@durations.inject { |a, b| a + b} / @durations.size
else
@default_duration
end
end
def record_duration(duration)
@durations[@durations_index] = duration
@durations_index += 1
@durations_index = 0 if @durations_index >= @durations.size
end
def sleep_time_for(try_number, clients_waiting_for_locks)
# this is optimal for spaced out spread
if try_number == 0
(self.duration * (clients_waiting_for_locks - 0.5))
else
self.duration / 2
end
# this one tends to be better than the one below where the clients are more spread out
# [self.duration * (clients_waiting_for_locks + 0.5), MAX_RETRY_DELAY].min
# this one tends to be better when clients enter at relatively the same time
# [self.duration * (clients_waiting_for_locks / (try_number + 1.0)), 0.1].max
end
def lock_key(record)
"#{record.class}/#{record[record.class.primary_key]}/pessimistic_lock"
end
def counter_key(record)
"#{record.class}/#{record[record.class.primary_key]}/pessimistic_lock/counter"
end
def safe_incr(key)
begin
@memcache.incr(key)
rescue Memcached::NotFound
begin
@memcache.add(key, "1", LOCK_TTL, false)
1
rescue Memcached::NotStored
@memcache.incr(key)
end
end
end
def safe_decr(key)
begin
@memcache.decr(key)
rescue Memcached::NotFound
end
end
end
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment