Skip to content

Instantly share code, notes, and snippets.

@taylorthurlow
Last active April 3, 2024 22:36
Show Gist options
  • Save taylorthurlow/84d942b77adf9e1200260b7d64d7fdff to your computer and use it in GitHub Desktop.
Save taylorthurlow/84d942b77adf9e1200260b7d64d7fdff to your computer and use it in GitHub Desktop.
Multi-reader, multi-writer, read-preferring distributed lock implementation for Ruby, backed by Redis
# WARNING: This locking algorithm is not yet production-tested and should not
# be used in production without further testing. Testing on a local machine
# seems to suggest that things are working as expected, but I am anticipating
# that there will be edge cases that I have missed. If you find one then feel
# free to contact me.
#
# The goal of this class is to model and provide a distributed (Redis-backed)
# multiple-reader, multiple-writer lock.
#
# The lock is read-preferring, meaning that readers will be able to read
# concurrently, but writers will be blocked until all readers have finished
# reading. This is a definite shortcoming, but it is currently not clear to me
# how to implement a write-preferring lock which is also distributed. There are
# plenty of reference implementations when concurrency is only needed between
# threads (write-preferring locks require the use of condition variables, and
# the ability to send "wakeup" signals to waiting "threads", which Redis does
# not appear to be able to provide).
#
# Despite being called a "read/write" lock, this class does not actually read
# or write any specific data. It is simply a lock which can be used to ensure a
# resource is not modified while it is being read, or read while it is being
# modified, and that multiple readers can read concurrently.
#
# The reference algorithm for this class is located on Wikipedia, linked below.
# The page also contains the write-preferring algorithm.
# https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_two_mutexes
require "digest"
# redlock - 2.0.6 at time of writing
# https://github.com/leandromoreira/redlock-rb
# Provides a Redis-backed locking mechanism, and allows any reader to release
# the lock (which is necessary for this algorithm to work).
require "redlock"
# redis-objects - 2.0.0.beta at time of writing
# https://github.com/nateware/redis-objects
# Provides Redis-backed data structures, such as counters and generic values.
require "redis-objects"
require "redis"
class DistributedReadWriteLock
REDIS_URL = "redis://localhost:6379/0"
REDIS_PASSWORD = nil
# @param key [String, Symbol]
# @param lock_acquisition_attempts [Integer] number of times to attempt to
# acquire a lock before giving up and raising an exception
# @param lock_acquisition_delay [Integer] number of milliseconds to wait
# between lock acquisition attempts
# @param read_lock_max_duration [Integer] number of milliseconds to hold a
# read lock before it expires and is automatically released
# @param global_lock_max_duration [Integer] number of milliseconds to hold a
# global lock before it expires and is automatically released
# @param debug [Boolean] whether to print debug messages
def initialize(
key,
lock_acquisition_attempts: 10,
lock_acquisition_delay: 500,
read_lock_max_duration: 10000,
global_lock_max_duration: 30000,
debug: false
)
@key = key
configure_redis_objects
# A counter which tracks the number of active readers. It should only be
# accessed while the reader lock is held. This allows us to determine when
# the reader is the "first" reader, meaning that before the reader lock was
# acquired, there were no other readers. This is important because the
# "first" reader is responsible for acquiring the global lock on behalf of
# all readers. The same is true for the "last" reader, which is responsible
# for releasing the global lock. This mechanism is why this algorithm is
# read-preferring - we ensure the global lock is always held by a reader,
# if there are any readers. The lock expires after 60 seconds, so whenever
# a new reader comes along, the global lock will need to be extended by
# each new reader.
@num_readers_active = Redis::Counter.new("#{key}_num_readers_active", expireat: -> { Time.now + 60 })
@num_readers_active.value = 0
# A marshaled hash object which is a "lock info" hash - essentially the
# full object representation of an acquired lock as far as Redlock is
# concerned. Readers need to be able to acquire the global lock on behalf
# of all readers (the "first" reader is responsible for this), and any
# reader must be able to release the global lock (the "last" reader is
# responsible for this). In order to accomplish this, we store the lock
# info hash Redis, and recall it when we need to release the global lock.
@global_lock_info = Redis::Value.new("#{key}_global_lock_info", marshal: true)
@lock_acquisition_attempts = lock_acquisition_attempts
@lock_acquisition_delay = lock_acquisition_delay
@read_lock_max_duration = read_lock_max_duration
@global_lock_max_duration = global_lock_max_duration
@debug = debug
end
# Obtain a read lock. Must be called with a block. The lock will be released
# when the block returns.
#
# @return [void]
def read
raise "No block provided" unless block_given?
begin_read
debug "Performing read"
begin
yield
ensure
end_read
end
end
# Obtain a write lock. Must be called with a block. The lock will be released
# when the block returns.
#
# @return [void]
def write
raise "No block provided" unless block_given?
if (global_lock_info = global_lock)
debug "Acquired global lock (writer)"
debug "Performing write"
result = begin
yield
ensure
redlock_client.unlock(global_lock_info)
debug "Released global lock (writer)"
end
else
raise "Failed to acquire global lock (writer)"
end
result || nil
end
private
# @return [Hash] the lock info hash
def begin_read
if (reader_lock_info = reader_lock)
debug "Acquired reader lock (begin_read) - #{reader_lock_info}"
global_lock_info = nil
global_lock_attempt = false
# NOTE: Atomic increment operation. If this block raises an exception, or
# returns a falsey value, the change to the value will be discarded.
@num_readers_active.increment do |new_value|
debug "Incremented @num_readers_active to #{new_value}"
if new_value == 1
# We are the first reader, so we need to acquire the global lock on
# behalf of all readers, and store it for future recall by either
# this reader, or other readers.
global_lock_attempt = true
if (global_lock_info = global_lock)
debug "Acquired global lock due to being first reader"
@global_lock_info.value = global_lock_info
end
elsif (global_lock_info = @global_lock_info.value)
# We are not the first reader, but there is a global lock stored, so
# we should refresh its expiration.
debug "Attempting to refresh global lock expiration"
global_lock_attempt = true
if (refreshed_global_lock_info = redlock_client.lock(global_lock_info[:resource], @global_lock_max_duration, extend: global_lock_info))
debug "Refreshed global lock expiration"
@global_lock_info.value = refreshed_global_lock_info
else
debug "Unable to refresh global lock expiration"
end
end
failed_global_lock_unlock = !global_lock_info && global_lock_attempt
# Ensure that we release the reader lock, even if something went wrong
redlock_client.unlock(reader_lock_info)
debug "Released reader lock (begin_read)"
if failed_global_lock_unlock
raise "Failed to acquire global lock"
end
# Reset the expiration time of the counter, otherwise it might
# disappear. TODO: Ensure that the expiration length is at least as
# long as the longest possible read operation, otherwise a long-running
# read could cause the counter to expire and be reset to 0, then get
# decremented to -1, which would be bad.
@num_readers_active.reset(new_value)
# Truthy return value is required by Redis::Counter to avoid the change
# being discarded
true
end
else
raise "Failed to acquire reader lock (begin_read)"
end
end
# @return [void]
def end_read
if (reader_lock_info = reader_lock)
debug "Acquired reader lock (end_read) - #{reader_lock_info}"
@num_readers_active.decrement do |new_value|
debug "Decremented @num_readers_active to #{new_value}"
failed_global_lock_unlock = false
if new_value == 0
if (global_lock_info = @global_lock_info.value)
redlock_client.unlock(global_lock_info)
@global_lock_info.value = nil
debug "Released and cleared global lock due to being final reader"
else
failed_global_lock_unlock = true
debug "Expected to find global lock info to perform unlock, but none found"
end
end
redlock_client.unlock(reader_lock_info)
debug "Released reader lock (end_read)"
# Defer raising an exception until after we have released the reader
# lock, otherwise we might end up in a situation where we have a reader
# lock but no global lock, which would be bad.
if failed_global_lock_unlock
raise "Expected to find global lock info to perform unlock, but none found"
end
# Reset the expiration time of the counter, otherwise it might
# disappear. TODO: Ensure that the expiration length is at least as
# long as the longest possible read operation, otherwise a long-running
# read could cause the counter to expire and be reset to 0, then get
# decremented to -1, which would be bad.
@num_readers_active.reset(new_value)
# Truthy return value is required by Redis::Counter to avoid the change
# being discarded
true
end
else
raise "Failed to acquire reader lock (end_read)"
end
end
# @return [Hash, FalseClass] lock info hash if the lock was available, false
# otherwise
def reader_lock
redlock_client.lock("#{@key}_reader_lock", @read_lock_max_duration)
end
# @return [Hash, FalseClass] lock info hash if the lock was available, false
# otherwise
def global_lock
redlock_client.lock("#{@key}_global_lock", @global_lock_max_duration)
end
# @return [Redlock::Client]
def redlock_client
Redlock::Client.new(
[REDIS_URL]
retry_count: @lock_acquisition_attempts,
retry_delay: @lock_acquisition_delay
)
end
# @return [void]
def configure_redis_objects
redis_config = {url: REDIS_URL, password: REDIS_PASSWORD}
redis_config.delete(:password) if redis_config[:password].blank?
Redis::Objects.redis = Redis.new(redis_config)
end
# @param message [String]
#
# @return [void]
def debug(message)
return unless @debug
tid = Thread.current.object_id.to_s
label = Digest::MD5.hexdigest(tid)[0..2]
puts "(#{label}) #{message}"
nil
end
end
@taylorthurlow
Copy link
Author

Just updated parts of the #read and #write public methods to account for the possibility that the blocks passed to those methods raises an exception. When that happened, we weren't ensuring that the lock is released properly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment