Skip to content

Instantly share code, notes, and snippets.

@ngoldbaum
Created September 19, 2024 19:33
Show Gist options
  • Save ngoldbaum/873332b1fa2a1a1dbe1171f5e414e6cc to your computer and use it in GitHub Desktop.
Save ngoldbaum/873332b1fa2a1a1dbe1171f5e414e6cc to your computer and use it in GitHub Desktop.
#[test]
fn test_pymutex_blocks() {
let mutex = OnceLock::<PyMutex<()>>::new();
let first_thread_locked_once = OnceLock::<bool>::new();
let second_thread_locked_once = OnceLock::<bool>::new();
let finished = OnceLock::<bool>::new();
let (sender, receiver) = sync_channel::<bool>(0);
mutex.get_or_init(|| PyMutex::new(()));
std::thread::scope(|s| {
s.spawn(|| {
let guard = mutex.get().unwrap().lock();
first_thread_locked_once.set(true).unwrap();
while finished.get().is_none() {
if second_thread_locked_once.get().is_some() {
// Wait a little to guard against the unlikely event that
// the other thread isn't blocked on acquiring the mutex yet.
// If PyMutex had a try_lock implementation this would be
// unnecessary
std::thread::sleep(std::time::Duration::from_millis(10));
// block (and hold the mutex) until the receiver actually receives something
sender.send(true).unwrap();
finished.set(true).unwrap();
}
}
drop(guard);
});
s.spawn(|| {
while first_thread_locked_once.get().is_none() {}
let mutex = mutex.get().unwrap();
second_thread_locked_once.set(true).unwrap();
let guard = mutex.lock();
assert!(finished.get().unwrap());
drop(guard);
});
// threads are blocked until we receive
receiver.recv().unwrap();
});
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment