Created
March 23, 2023 17:32
-
-
Save dbrgn/6ada321cb768336f86ad5d98ca2bd06c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::sync::Arc; | |
use tokio::sync::Semaphore; | |
/// An async latch. | |
/// | |
/// Internally this is implemented using a semaphore with an initial value of 0. | |
/// The semaphore can be closed to "trigger" the latch. A "Closed" error when | |
/// awaiting the semaphore is treated as if the latch is unlocked. | |
#[derive(Clone)] | |
pub struct Latch(Arc<Semaphore>); | |
impl Latch { | |
pub fn new() -> Self { | |
Self(Arc::new(Semaphore::new(0))) | |
} | |
/// Unlock the semaphore and wake up all awaiters. Future awaiters will | |
/// immediately be granted a permit. | |
pub fn unlock(&self) { | |
self.0.close(); | |
} | |
pub async fn wait(&self) { | |
self.0 | |
.acquire() | |
.await | |
// We expect an acquire error (due to the semaphore having been | |
// closed). If we get a permit, that would indicate an | |
// implementation bug. | |
.expect_err("Latch error: Semaphore unexpectedly gave us a permit"); | |
// If we get here, the semaphore was closed and the latch is unlocked | |
} | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
#[test] | |
fn latch_unlock() { | |
// Create multi-threaded runtime | |
let runtime = tokio::runtime::Builder::new_multi_thread() | |
.enable_all() | |
.build() | |
.unwrap(); | |
// Create latch | |
let latch = Latch::new(); | |
// Spawn two threads that wait for the latch to unlock | |
let latch1 = latch.clone(); | |
let task1 = runtime.spawn(async move { | |
println!("Wait with task 1"); | |
latch1.wait().await; | |
println!("Task 1 unlocked"); | |
}); | |
let latch2 = latch.clone(); | |
let task2 = runtime.spawn(async move { | |
println!("Wait with task 2"); | |
latch2.wait().await; | |
println!("Task 2 unlocked"); | |
}); | |
// After 100 ms, the two tasks aren't yet done | |
std::thread::sleep(std::time::Duration::from_millis(100)); | |
assert!(!task1.is_finished()); | |
assert!(!task2.is_finished()); | |
// Unlock the latch | |
latch.unlock(); | |
// After a short moment, the two tasks should be done | |
std::thread::sleep(std::time::Duration::from_millis(50)); | |
assert!(task1.is_finished()); | |
assert!(task2.is_finished()); | |
// Further calls to the latch should complete immediately | |
runtime.block_on(async move { | |
println!("Waiting for latch"); | |
latch.wait().await; | |
println!("Latch is already unlocked"); | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment