Skip to content

Instantly share code, notes, and snippets.

@hkolbeck
Last active June 20, 2023 07:15
Show Gist options
  • Save hkolbeck/f8d99c6d20127ad4a2c9c87a69bfaa45 to your computer and use it in GitHub Desktop.
Save hkolbeck/f8d99c6d20127ad4a2c9c87a69bfaa45 to your computer and use it in GitHub Desktop.
Kotlin and Rust Read-Write locks where readers never block, though values may be stale
package industries.hannah.cachelock
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.locks.Condition
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* A Read-Write lock designed to allow readers to proceed unblocked while writers are manipulating the guarded
* data. As a first go-round this requires that all operations take place in a provided lambda. Unlike Rust
* there's no way to protect against modification in read() or saving aside a reference to the contained data,
* but then that's true of stdlib-style locks in Kotlin as well.
*
* If the provided lambda or the provided clone() operation throw during a write, the lock becomes poisoned and
* is not usable until repair() is called.
*/
class CachingRwLock<T>(initVal: T, fair: Boolean, private val clone: (T) -> T) {
private val writeLock = ReentrantLock(fair)
private var writeValue = initVal
private val readVal = AtomicReference(clone(initVal))
private var poison: AtomicReference<Throwable?> = AtomicReference(null)
fun read(action: (T) -> Unit) {
poison.get()?.run { throw PoisonException(this) }
// The lock could be poisoned between the above check and this
// call, but it'll still be the last good value
action(readVal.get())
}
fun write(action: (T) -> T) {
writeLock.withLock {
poison.get()?.run { throw PoisonException(this) }
try {
writeValue = action(writeValue)
readVal.set(clone(writeValue))
} catch (t: Throwable) {
poison.set(t)
throw PoisonException(t)
}
}
}
fun repair(t: T) {
writeLock.withLock {
writeValue = t
readVal.set(clone(t))
poison.set(null)
}
}
// Present a roughly ReadWriteLock shaped API other than not being able to lock/unlock independently
fun getWriteWaitQueueLength(condition: Condition) = writeLock.getWaitQueueLength(condition)
fun hasWriteQueuedThread(thread: Thread) = writeLock.hasQueuedThread(thread)
fun hasWriteQueuedThreads() = writeLock.hasQueuedThreads()
fun hasWriteWaiters(condition: Condition) = writeLock.hasWaiters(condition)
fun isFair() = writeLock.isFair
fun writeQueueLength() = writeLock.queueLength
}
class PoisonException(cause: Throwable) : RuntimeException(cause)
extern crate core;
use std::borrow::BorrowMut;
use std::sync::{Arc, Mutex};
use std::sync::atomic::{AtomicBool, Ordering};
use arc_swap::ArcSwap;
/// A Read-Write lock designed to allow readers to proceed unblocked while writers are manipulating
/// the guarded data. As a first go-round this requires that all operations take place in a provided
/// closure.
///
/// This dances a fine line between an RwLock and an ArcSwap, and targets use cases where:
/// 1. Performing the write operations needs to put the value through invalid states
/// 2. Those write operations are expensive in terms of time or IO
/// 3. The guarded data is small enough to be stored doubly. That could mean very different things
/// depending on context
/// 4. Reads are frequent and in a critical path, making an RwLock contentious
///
/// If the provided closure returns an Err result, the lock becomes poisoned and is not usable until
/// repair() is called.
pub struct CachingRwLock<T> {
write_lock: Mutex<T>,
read_arc: ArcSwap<T>,
poisoned: AtomicBool,
}
pub type Result<T> = std::result::Result<T, String>;
impl<T: Clone> CachingRwLock<T> {
fn new(init: T) -> CachingRwLock<T> {
CachingRwLock {
read_arc: ArcSwap::new(Arc::new(init.clone())),
write_lock: Mutex::new(init),
poisoned: AtomicBool::new(false),
}
}
// `poisoned` could become true between check and load, but the read_arc will still
// hold the last good value so only the next read failing is probably fine
fn read(&self) -> Result<Arc<T>> {
if self.poisoned.load(Ordering::Relaxed) {
Err("Poisoned".to_string())
} else {
Ok(self.read_arc.load().clone())
}
}
fn write(&self, op: fn(&mut T) -> Result<()>) -> Result<()> {
match self.write_lock.lock() {
Ok(mut t) => {
if self.poisoned.load(Ordering::Relaxed) {
return Err("Poisoned!".to_string());
}
match op(t.borrow_mut()) {
Ok(()) => {
self.read_arc.swap(Arc::new(t.clone()));
Ok(())
}
Err(e) => {
self.poisoned.swap(true, Ordering::Relaxed);
Err(e)
}
}
}
Err(e) => {
self.poisoned.swap(true, Ordering::Relaxed);
Err(e.to_string())
}
}
}
fn repair(&self, new_t: T) -> Result<()> {
match self.write_lock.lock() {
Ok(mut t) => {
self.read_arc.swap(Arc::new(new_t.clone()));
*t = new_t;
self.poisoned.swap(false, Ordering::Relaxed);
Ok(())
}
Err(e) => {
Err(format!("Inner lock is poisoned: {}", e))
}
}
}
}
#[cfg(test)]
mod tests {
use std::borrow::Borrow;
use std::fmt::Debug;
use crate::CachingRwLock;
#[derive(Debug)]
struct Thing {
byte: u8,
name: Box<String>,
}
impl Clone for Thing {
fn clone(&self) -> Self {
let x: &String = self.name.borrow();
Thing {
byte: self.byte,
name: Box::new(x.clone()),
}
}
}
#[test]
fn it_works_int() {
let actual_int_lock = CachingRwLock::new(10);
let int_lock = &actual_int_lock;
match int_lock.read() {
Ok(i) => assert_eq!(*i, 10),
Err(e) => panic!("{}", e),
}
if let Err(e) = int_lock.write(|val| {
*val = 11;
Ok(())
}) {
panic!("{}", e)
}
match int_lock.read() {
Ok(i) => assert_eq!(*i, 11),
Err(e) => panic!("{}", e),
}
}
#[test]
fn it_works_thing() {
let actual_thing_lock = CachingRwLock::new(Thing {
byte: 8,
name: Box::new("Hi".to_string()),
});
let thing_lock = &actual_thing_lock;
match thing_lock.read() {
Ok(t) => {
assert_eq!(t.byte, 8);
assert_eq!(*t.name, "Hi".to_string())
}
Err(e) => panic!("{}", e),
}
if let Err(e) = thing_lock.write(|thing| {
*thing.name = "Ok bye".to_string();
thing.byte += 1;
Ok(())
}) {
panic!("{}", e);
}
match thing_lock.read() {
Ok(t) => {
assert_eq!(t.byte, 9);
assert_eq!(t.name, Box::new("Ok bye".to_string()))
}
Err(e) => panic!("{}", e),
}
}
#[test]
fn repair_works() {
let actual_int_lock = CachingRwLock::new(10);
let int_lock = &actual_int_lock;
if let Err(e) = int_lock.write(|val| {
*val = 11;
Ok(())
}) {
panic!("{}", e)
}
match int_lock.read() {
Ok(i) => assert_eq!(*i, 11),
Err(e) => panic!("{}", e),
}
if let Ok(()) = int_lock.write(|_| {
Err("Expected".to_string())
}) {
panic!("Write returned Ok even though op failed")
}
if let Ok(t) = int_lock.read() {
panic!("Lock should be poisoned! Got {}", t)
}
if let Err(e) = int_lock.repair(7) {
panic!("Repair failed! {}", e)
}
match int_lock.read() {
Ok(i) => assert_eq!(*i, 7),
Err(e) => panic!("{}", e),
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment