Last active
June 20, 2023 07:15
-
-
Save hkolbeck/f8d99c6d20127ad4a2c9c87a69bfaa45 to your computer and use it in GitHub Desktop.
Kotlin and Rust Read-Write locks where readers never block, though values may be stale
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package industries.hannah.cachelock | |
import java.util.concurrent.atomic.AtomicReference | |
import java.util.concurrent.locks.Condition | |
import java.util.concurrent.locks.ReentrantLock | |
import kotlin.concurrent.withLock | |
/** | |
* A Read-Write lock designed to allow readers to proceed unblocked while writers are manipulating the guarded | |
* data. As a first go-round this requires that all operations take place in a provided lambda. Unlike Rust | |
* there's no way to protect against modification in read() or saving aside a reference to the contained data, | |
* but then that's true of stdlib-style locks in Kotlin as well. | |
* | |
* If the provided lambda or the provided clone() operation throw during a write, the lock becomes poisoned and | |
* is not usable until repair() is called. | |
*/ | |
class CachingRwLock<T>(initVal: T, fair: Boolean, private val clone: (T) -> T) { | |
private val writeLock = ReentrantLock(fair) | |
private var writeValue = initVal | |
private val readVal = AtomicReference(clone(initVal)) | |
private var poison: AtomicReference<Throwable?> = AtomicReference(null) | |
fun read(action: (T) -> Unit) { | |
poison.get()?.run { throw PoisonException(this) } | |
// The lock could be poisoned between the above check and this | |
// call, but it'll still be the last good value | |
action(readVal.get()) | |
} | |
fun write(action: (T) -> T) { | |
writeLock.withLock { | |
poison.get()?.run { throw PoisonException(this) } | |
try { | |
writeValue = action(writeValue) | |
readVal.set(clone(writeValue)) | |
} catch (t: Throwable) { | |
poison.set(t) | |
throw PoisonException(t) | |
} | |
} | |
} | |
fun repair(t: T) { | |
writeLock.withLock { | |
writeValue = t | |
readVal.set(clone(t)) | |
poison.set(null) | |
} | |
} | |
// Present a roughly ReadWriteLock shaped API other than not being able to lock/unlock independently | |
fun getWriteWaitQueueLength(condition: Condition) = writeLock.getWaitQueueLength(condition) | |
fun hasWriteQueuedThread(thread: Thread) = writeLock.hasQueuedThread(thread) | |
fun hasWriteQueuedThreads() = writeLock.hasQueuedThreads() | |
fun hasWriteWaiters(condition: Condition) = writeLock.hasWaiters(condition) | |
fun isFair() = writeLock.isFair | |
fun writeQueueLength() = writeLock.queueLength | |
} | |
class PoisonException(cause: Throwable) : RuntimeException(cause) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
extern crate core; | |
use std::borrow::BorrowMut; | |
use std::sync::{Arc, Mutex}; | |
use std::sync::atomic::{AtomicBool, Ordering}; | |
use arc_swap::ArcSwap; | |
/// A Read-Write lock designed to allow readers to proceed unblocked while writers are manipulating | |
/// the guarded data. As a first go-round this requires that all operations take place in a provided | |
/// closure. | |
/// | |
/// This dances a fine line between an RwLock and an ArcSwap, and targets use cases where: | |
/// 1. Performing the write operations needs to put the value through invalid states | |
/// 2. Those write operations are expensive in terms of time or IO | |
/// 3. The guarded data is small enough to be stored doubly. That could mean very different things | |
/// depending on context | |
/// 4. Reads are frequent and in a critical path, making an RwLock contentious | |
/// | |
/// If the provided closure returns an Err result, the lock becomes poisoned and is not usable until | |
/// repair() is called. | |
pub struct CachingRwLock<T> { | |
write_lock: Mutex<T>, | |
read_arc: ArcSwap<T>, | |
poisoned: AtomicBool, | |
} | |
pub type Result<T> = std::result::Result<T, String>; | |
impl<T: Clone> CachingRwLock<T> { | |
fn new(init: T) -> CachingRwLock<T> { | |
CachingRwLock { | |
read_arc: ArcSwap::new(Arc::new(init.clone())), | |
write_lock: Mutex::new(init), | |
poisoned: AtomicBool::new(false), | |
} | |
} | |
// `poisoned` could become true between check and load, but the read_arc will still | |
// hold the last good value so only the next read failing is probably fine | |
fn read(&self) -> Result<Arc<T>> { | |
if self.poisoned.load(Ordering::Relaxed) { | |
Err("Poisoned".to_string()) | |
} else { | |
Ok(self.read_arc.load().clone()) | |
} | |
} | |
fn write(&self, op: fn(&mut T) -> Result<()>) -> Result<()> { | |
match self.write_lock.lock() { | |
Ok(mut t) => { | |
if self.poisoned.load(Ordering::Relaxed) { | |
return Err("Poisoned!".to_string()); | |
} | |
match op(t.borrow_mut()) { | |
Ok(()) => { | |
self.read_arc.swap(Arc::new(t.clone())); | |
Ok(()) | |
} | |
Err(e) => { | |
self.poisoned.swap(true, Ordering::Relaxed); | |
Err(e) | |
} | |
} | |
} | |
Err(e) => { | |
self.poisoned.swap(true, Ordering::Relaxed); | |
Err(e.to_string()) | |
} | |
} | |
} | |
fn repair(&self, new_t: T) -> Result<()> { | |
match self.write_lock.lock() { | |
Ok(mut t) => { | |
self.read_arc.swap(Arc::new(new_t.clone())); | |
*t = new_t; | |
self.poisoned.swap(false, Ordering::Relaxed); | |
Ok(()) | |
} | |
Err(e) => { | |
Err(format!("Inner lock is poisoned: {}", e)) | |
} | |
} | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use std::borrow::Borrow; | |
use std::fmt::Debug; | |
use crate::CachingRwLock; | |
#[derive(Debug)] | |
struct Thing { | |
byte: u8, | |
name: Box<String>, | |
} | |
impl Clone for Thing { | |
fn clone(&self) -> Self { | |
let x: &String = self.name.borrow(); | |
Thing { | |
byte: self.byte, | |
name: Box::new(x.clone()), | |
} | |
} | |
} | |
#[test] | |
fn it_works_int() { | |
let actual_int_lock = CachingRwLock::new(10); | |
let int_lock = &actual_int_lock; | |
match int_lock.read() { | |
Ok(i) => assert_eq!(*i, 10), | |
Err(e) => panic!("{}", e), | |
} | |
if let Err(e) = int_lock.write(|val| { | |
*val = 11; | |
Ok(()) | |
}) { | |
panic!("{}", e) | |
} | |
match int_lock.read() { | |
Ok(i) => assert_eq!(*i, 11), | |
Err(e) => panic!("{}", e), | |
} | |
} | |
#[test] | |
fn it_works_thing() { | |
let actual_thing_lock = CachingRwLock::new(Thing { | |
byte: 8, | |
name: Box::new("Hi".to_string()), | |
}); | |
let thing_lock = &actual_thing_lock; | |
match thing_lock.read() { | |
Ok(t) => { | |
assert_eq!(t.byte, 8); | |
assert_eq!(*t.name, "Hi".to_string()) | |
} | |
Err(e) => panic!("{}", e), | |
} | |
if let Err(e) = thing_lock.write(|thing| { | |
*thing.name = "Ok bye".to_string(); | |
thing.byte += 1; | |
Ok(()) | |
}) { | |
panic!("{}", e); | |
} | |
match thing_lock.read() { | |
Ok(t) => { | |
assert_eq!(t.byte, 9); | |
assert_eq!(t.name, Box::new("Ok bye".to_string())) | |
} | |
Err(e) => panic!("{}", e), | |
} | |
} | |
#[test] | |
fn repair_works() { | |
let actual_int_lock = CachingRwLock::new(10); | |
let int_lock = &actual_int_lock; | |
if let Err(e) = int_lock.write(|val| { | |
*val = 11; | |
Ok(()) | |
}) { | |
panic!("{}", e) | |
} | |
match int_lock.read() { | |
Ok(i) => assert_eq!(*i, 11), | |
Err(e) => panic!("{}", e), | |
} | |
if let Ok(()) = int_lock.write(|_| { | |
Err("Expected".to_string()) | |
}) { | |
panic!("Write returned Ok even though op failed") | |
} | |
if let Ok(t) = int_lock.read() { | |
panic!("Lock should be poisoned! Got {}", t) | |
} | |
if let Err(e) = int_lock.repair(7) { | |
panic!("Repair failed! {}", e) | |
} | |
match int_lock.read() { | |
Ok(i) => assert_eq!(*i, 7), | |
Err(e) => panic!("{}", e), | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment