Skip to content

Instantly share code, notes, and snippets.

@dbrgn
Created March 23, 2023 17:32
Show Gist options
  • Save dbrgn/6ada321cb768336f86ad5d98ca2bd06c to your computer and use it in GitHub Desktop.
Save dbrgn/6ada321cb768336f86ad5d98ca2bd06c to your computer and use it in GitHub Desktop.
use std::sync::Arc;
use tokio::sync::Semaphore;
/// An async latch.
///
/// Internally this is implemented using a semaphore with an initial value of 0.
/// The semaphore can be closed to "trigger" the latch. A "Closed" error when
/// awaiting the semaphore is treated as if the latch is unlocked.
#[derive(Clone)]
pub struct Latch(Arc<Semaphore>);
impl Latch {
pub fn new() -> Self {
Self(Arc::new(Semaphore::new(0)))
}
/// Unlock the semaphore and wake up all awaiters. Future awaiters will
/// immediately be granted a permit.
pub fn unlock(&self) {
self.0.close();
}
pub async fn wait(&self) {
self.0
.acquire()
.await
// We expect an acquire error (due to the semaphore having been
// closed). If we get a permit, that would indicate an
// implementation bug.
.expect_err("Latch error: Semaphore unexpectedly gave us a permit");
// If we get here, the semaphore was closed and the latch is unlocked
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn latch_unlock() {
// Create multi-threaded runtime
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
// Create latch
let latch = Latch::new();
// Spawn two threads that wait for the latch to unlock
let latch1 = latch.clone();
let task1 = runtime.spawn(async move {
println!("Wait with task 1");
latch1.wait().await;
println!("Task 1 unlocked");
});
let latch2 = latch.clone();
let task2 = runtime.spawn(async move {
println!("Wait with task 2");
latch2.wait().await;
println!("Task 2 unlocked");
});
// After 100 ms, the two tasks aren't yet done
std::thread::sleep(std::time::Duration::from_millis(100));
assert!(!task1.is_finished());
assert!(!task2.is_finished());
// Unlock the latch
latch.unlock();
// After a short moment, the two tasks should be done
std::thread::sleep(std::time::Duration::from_millis(50));
assert!(task1.is_finished());
assert!(task2.is_finished());
// Further calls to the latch should complete immediately
runtime.block_on(async move {
println!("Waiting for latch");
latch.wait().await;
println!("Latch is already unlocked");
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment