Last active
April 3, 2024 22:36
-
-
Save taylorthurlow/84d942b77adf9e1200260b7d64d7fdff to your computer and use it in GitHub Desktop.
Multi-reader, multi-writer, read-preferring distributed lock implementation for Ruby, backed by Redis
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# WARNING: This locking algorithm is not yet production-tested and should not | |
# be used in production without further testing. Testing on a local machine | |
# seems to suggest that things are working as expected, but I am anticipating | |
# that there will be edge cases that I have missed. If you find one then feel | |
# free to contact me. | |
# | |
# The goal of this class is to model and provide a distributed (Redis-backed) | |
# multiple-reader, multiple-writer lock. | |
# | |
# The lock is read-preferring, meaning that readers will be able to read | |
# concurrently, but writers will be blocked until all readers have finished | |
# reading. This is a definite shortcoming, but it is currently not clear to me | |
# how to implement a write-preferring lock which is also distributed. There are | |
# plenty of reference implementations when concurrency is only needed between | |
# threads (write-preferring locks require the use of condition variables, and | |
# the ability to send "wakeup" signals to waiting "threads", which Redis does | |
# not appear to be able to provide). | |
# | |
# Despite being called a "read/write" lock, this class does not actually read | |
# or write any specific data. It is simply a lock which can be used to ensure a | |
# resource is not modified while it is being read, or read while it is being | |
# modified, and that multiple readers can read concurrently. | |
# | |
# The reference algorithm for this class is located on Wikipedia, linked below. | |
# The page also contains the write-preferring algorithm. | |
# https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock#Using_two_mutexes | |
require "digest" | |
# redlock - 2.0.6 at time of writing | |
# https://github.com/leandromoreira/redlock-rb | |
# Provides a Redis-backed locking mechanism, and allows any reader to release | |
# the lock (which is necessary for this algorithm to work). | |
require "redlock" | |
# redis-objects - 2.0.0.beta at time of writing | |
# https://github.com/nateware/redis-objects | |
# Provides Redis-backed data structures, such as counters and generic values. | |
require "redis-objects" | |
require "redis" | |
class DistributedReadWriteLock | |
REDIS_URL = "redis://localhost:6379/0" | |
REDIS_PASSWORD = nil | |
# @param key [String, Symbol] | |
# @param lock_acquisition_attempts [Integer] number of times to attempt to | |
# acquire a lock before giving up and raising an exception | |
# @param lock_acquisition_delay [Integer] number of milliseconds to wait | |
# between lock acquisition attempts | |
# @param read_lock_max_duration [Integer] number of milliseconds to hold a | |
# read lock before it expires and is automatically released | |
# @param global_lock_max_duration [Integer] number of milliseconds to hold a | |
# global lock before it expires and is automatically released | |
# @param debug [Boolean] whether to print debug messages | |
def initialize( | |
key, | |
lock_acquisition_attempts: 10, | |
lock_acquisition_delay: 500, | |
read_lock_max_duration: 10000, | |
global_lock_max_duration: 30000, | |
debug: false | |
) | |
@key = key | |
configure_redis_objects | |
# A counter which tracks the number of active readers. It should only be | |
# accessed while the reader lock is held. This allows us to determine when | |
# the reader is the "first" reader, meaning that before the reader lock was | |
# acquired, there were no other readers. This is important because the | |
# "first" reader is responsible for acquiring the global lock on behalf of | |
# all readers. The same is true for the "last" reader, which is responsible | |
# for releasing the global lock. This mechanism is why this algorithm is | |
# read-preferring - we ensure the global lock is always held by a reader, | |
# if there are any readers. The lock expires after 60 seconds, so whenever | |
# a new reader comes along, the global lock will need to be extended by | |
# each new reader. | |
@num_readers_active = Redis::Counter.new("#{key}_num_readers_active", expireat: -> { Time.now + 60 }) | |
@num_readers_active.value = 0 | |
# A marshaled hash object which is a "lock info" hash - essentially the | |
# full object representation of an acquired lock as far as Redlock is | |
# concerned. Readers need to be able to acquire the global lock on behalf | |
# of all readers (the "first" reader is responsible for this), and any | |
# reader must be able to release the global lock (the "last" reader is | |
# responsible for this). In order to accomplish this, we store the lock | |
# info hash Redis, and recall it when we need to release the global lock. | |
@global_lock_info = Redis::Value.new("#{key}_global_lock_info", marshal: true) | |
@lock_acquisition_attempts = lock_acquisition_attempts | |
@lock_acquisition_delay = lock_acquisition_delay | |
@read_lock_max_duration = read_lock_max_duration | |
@global_lock_max_duration = global_lock_max_duration | |
@debug = debug | |
end | |
# Obtain a read lock. Must be called with a block. The lock will be released | |
# when the block returns. | |
# | |
# @return [void] | |
def read | |
raise "No block provided" unless block_given? | |
begin_read | |
debug "Performing read" | |
begin | |
yield | |
ensure | |
end_read | |
end | |
end | |
# Obtain a write lock. Must be called with a block. The lock will be released | |
# when the block returns. | |
# | |
# @return [void] | |
def write | |
raise "No block provided" unless block_given? | |
if (global_lock_info = global_lock) | |
debug "Acquired global lock (writer)" | |
debug "Performing write" | |
result = begin | |
yield | |
ensure | |
redlock_client.unlock(global_lock_info) | |
debug "Released global lock (writer)" | |
end | |
else | |
raise "Failed to acquire global lock (writer)" | |
end | |
result || nil | |
end | |
private | |
# @return [Hash] the lock info hash | |
def begin_read | |
if (reader_lock_info = reader_lock) | |
debug "Acquired reader lock (begin_read) - #{reader_lock_info}" | |
global_lock_info = nil | |
global_lock_attempt = false | |
# NOTE: Atomic increment operation. If this block raises an exception, or | |
# returns a falsey value, the change to the value will be discarded. | |
@num_readers_active.increment do |new_value| | |
debug "Incremented @num_readers_active to #{new_value}" | |
if new_value == 1 | |
# We are the first reader, so we need to acquire the global lock on | |
# behalf of all readers, and store it for future recall by either | |
# this reader, or other readers. | |
global_lock_attempt = true | |
if (global_lock_info = global_lock) | |
debug "Acquired global lock due to being first reader" | |
@global_lock_info.value = global_lock_info | |
end | |
elsif (global_lock_info = @global_lock_info.value) | |
# We are not the first reader, but there is a global lock stored, so | |
# we should refresh its expiration. | |
debug "Attempting to refresh global lock expiration" | |
global_lock_attempt = true | |
if (refreshed_global_lock_info = redlock_client.lock(global_lock_info[:resource], @global_lock_max_duration, extend: global_lock_info)) | |
debug "Refreshed global lock expiration" | |
@global_lock_info.value = refreshed_global_lock_info | |
else | |
debug "Unable to refresh global lock expiration" | |
end | |
end | |
failed_global_lock_unlock = !global_lock_info && global_lock_attempt | |
# Ensure that we release the reader lock, even if something went wrong | |
redlock_client.unlock(reader_lock_info) | |
debug "Released reader lock (begin_read)" | |
if failed_global_lock_unlock | |
raise "Failed to acquire global lock" | |
end | |
# Reset the expiration time of the counter, otherwise it might | |
# disappear. TODO: Ensure that the expiration length is at least as | |
# long as the longest possible read operation, otherwise a long-running | |
# read could cause the counter to expire and be reset to 0, then get | |
# decremented to -1, which would be bad. | |
@num_readers_active.reset(new_value) | |
# Truthy return value is required by Redis::Counter to avoid the change | |
# being discarded | |
true | |
end | |
else | |
raise "Failed to acquire reader lock (begin_read)" | |
end | |
end | |
# @return [void] | |
def end_read | |
if (reader_lock_info = reader_lock) | |
debug "Acquired reader lock (end_read) - #{reader_lock_info}" | |
@num_readers_active.decrement do |new_value| | |
debug "Decremented @num_readers_active to #{new_value}" | |
failed_global_lock_unlock = false | |
if new_value == 0 | |
if (global_lock_info = @global_lock_info.value) | |
redlock_client.unlock(global_lock_info) | |
@global_lock_info.value = nil | |
debug "Released and cleared global lock due to being final reader" | |
else | |
failed_global_lock_unlock = true | |
debug "Expected to find global lock info to perform unlock, but none found" | |
end | |
end | |
redlock_client.unlock(reader_lock_info) | |
debug "Released reader lock (end_read)" | |
# Defer raising an exception until after we have released the reader | |
# lock, otherwise we might end up in a situation where we have a reader | |
# lock but no global lock, which would be bad. | |
if failed_global_lock_unlock | |
raise "Expected to find global lock info to perform unlock, but none found" | |
end | |
# Reset the expiration time of the counter, otherwise it might | |
# disappear. TODO: Ensure that the expiration length is at least as | |
# long as the longest possible read operation, otherwise a long-running | |
# read could cause the counter to expire and be reset to 0, then get | |
# decremented to -1, which would be bad. | |
@num_readers_active.reset(new_value) | |
# Truthy return value is required by Redis::Counter to avoid the change | |
# being discarded | |
true | |
end | |
else | |
raise "Failed to acquire reader lock (end_read)" | |
end | |
end | |
# @return [Hash, FalseClass] lock info hash if the lock was available, false | |
# otherwise | |
def reader_lock | |
redlock_client.lock("#{@key}_reader_lock", @read_lock_max_duration) | |
end | |
# @return [Hash, FalseClass] lock info hash if the lock was available, false | |
# otherwise | |
def global_lock | |
redlock_client.lock("#{@key}_global_lock", @global_lock_max_duration) | |
end | |
# @return [Redlock::Client] | |
def redlock_client | |
Redlock::Client.new( | |
[REDIS_URL] | |
retry_count: @lock_acquisition_attempts, | |
retry_delay: @lock_acquisition_delay | |
) | |
end | |
# @return [void] | |
def configure_redis_objects | |
redis_config = {url: REDIS_URL, password: REDIS_PASSWORD} | |
redis_config.delete(:password) if redis_config[:password].blank? | |
Redis::Objects.redis = Redis.new(redis_config) | |
end | |
# @param message [String] | |
# | |
# @return [void] | |
def debug(message) | |
return unless @debug | |
tid = Thread.current.object_id.to_s | |
label = Digest::MD5.hexdigest(tid)[0..2] | |
puts "(#{label}) #{message}" | |
nil | |
end | |
end |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Just updated parts of the #read and #write public methods to account for the possibility that the blocks passed to those methods raises an exception. When that happened, we weren't ensuring that the lock is released properly.