Last active
November 10, 2025 17:36
-
-
Save matthewoestreich/a62e93dff414512f5db41b210df0f088 to your computer and use it in GitHub Desktop.
Thread safe VecDeque
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| pub mod threadsafe { | |
| use std::{ | |
| collections::VecDeque, | |
| sync::{Arc, Mutex, MutexGuard}, | |
| }; | |
| // One-liner that allows us to easily lock a Mutex while handling possible poison. | |
| fn safe_lock<T>(m: &Mutex<T>) -> MutexGuard<'_, T> { | |
| match m.lock() { | |
| Ok(guard) => guard, | |
| Err(poisoned) => poisoned.into_inner(), | |
| } | |
| } | |
| /// ThreadedDeque is a threadsafe VecDeque. | |
| pub struct ThreadedDeque<T> { | |
| inner: Arc<Mutex<VecDeque<T>>>, | |
| } | |
| impl<T> Clone for ThreadedDeque<T> { | |
| fn clone(&self) -> Self { | |
| Self { | |
| inner: Arc::clone(&self.inner), | |
| } | |
| } | |
| } | |
| impl<T> Default for ThreadedDeque<T> { | |
| fn default() -> Self { | |
| Self { | |
| inner: Arc::new(Mutex::new(VecDeque::new())), | |
| } | |
| } | |
| } | |
| #[allow(dead_code)] | |
| impl<T> ThreadedDeque<T> { | |
| pub fn new() -> Self { | |
| Self { | |
| inner: Arc::new(Mutex::new(VecDeque::new())), | |
| } | |
| } | |
| /// If you want to get the MutexGuard for the inner VecDeque<T> | |
| /// This is not needed to use any methods! | |
| pub fn lock(&self) -> MutexGuard<'_, VecDeque<T>> { | |
| safe_lock(&self.inner) | |
| } | |
| #[inline] | |
| pub fn len(&self) -> usize { | |
| safe_lock(&self.inner).len() | |
| } | |
| #[inline] | |
| pub fn is_empty(&self) -> bool { | |
| safe_lock(&self.inner).is_empty() | |
| } | |
| #[inline] | |
| pub fn push_back(&self, value: T) { | |
| safe_lock(&self.inner).push_back(value); | |
| } | |
| #[inline] | |
| pub fn pop_back(&self) -> Option<T> { | |
| safe_lock(&self.inner).pop_back() | |
| } | |
| #[inline] | |
| pub fn pop_front(&self) -> Option<T> { | |
| safe_lock(&self.inner).pop_front() | |
| } | |
| /// Returns a clone of the front. **DOES NOT MODIFY THE UNDERLYING VecDeque** | |
| #[inline] | |
| pub fn front(&self) -> Option<T> | |
| where | |
| T: Clone, | |
| { | |
| safe_lock(&self.inner).front().cloned() | |
| } | |
| } | |
| #[cfg(test)] | |
| mod tests { | |
| use std::{ | |
| panic::{self, RefUnwindSafe, UnwindSafe}, | |
| sync::{Arc, mpsc}, | |
| thread, | |
| }; | |
| use super::ThreadedDeque; | |
| // Runs a test `n_times` in a row. | |
| // failure_threshold : If this many runs fail this test willl fail. If 'failure_threshold' = 2, if 3 jobs fail, this job fails. | |
| fn run_test_n_times<F>( | |
| n_times: usize, | |
| failure_threshold: usize, | |
| log_job_info: bool, | |
| test_fn: F, | |
| ) where | |
| F: FnOnce() + Send + Sync + Clone + Copy + UnwindSafe + RefUnwindSafe + 'static, | |
| { | |
| let mut failed_iterations: Vec<(usize, String)> = Vec::new(); | |
| let thread_safe_test_fn = Arc::new(test_fn); | |
| for i in 0..n_times { | |
| if log_job_info { | |
| println!("\n--------------------- JOB {i} ---------------------"); | |
| } | |
| let thread_fn = Arc::clone(&thread_safe_test_fn); | |
| let (release_tx, release_rx) = mpsc::sync_channel(0); | |
| let release_sender = release_tx.clone(); | |
| let handle = thread::spawn(move || { | |
| let result = panic::catch_unwind(|| { | |
| thread_fn(); | |
| }); | |
| let _ = release_sender.send(result); | |
| }); | |
| let result = release_rx.recv().unwrap(); | |
| let _ = handle.join(); | |
| if let Err(e) = result { | |
| let msg = if let Some(s) = e.downcast_ref::<&str>() { | |
| s.to_string() | |
| } else if let Some(s) = e.downcast_ref::<String>() { | |
| s.clone() | |
| } else { | |
| "-".to_string() | |
| }; | |
| failed_iterations.push((i, msg)); | |
| } | |
| } | |
| if log_job_info && !failed_iterations.is_empty() { | |
| println!( | |
| "\n\n------------------------------------------ Failed Iterations ------------------------------------------\n\n{:#?}\n\n-------------------------------------------------------------------------------------------------------", | |
| failed_iterations | |
| ); | |
| } | |
| assert!( | |
| if failure_threshold == 0 { | |
| failed_iterations.is_empty() | |
| } else { | |
| failed_iterations.len() <= failure_threshold | |
| }, | |
| "expected this to fail at most {failure_threshold} times, instead it failed {}/{}", | |
| failed_iterations.len(), | |
| n_times | |
| ); | |
| } | |
| #[test] | |
| fn test_threaded_deque_push_pop_basic() { | |
| let deque = ThreadedDeque::new(); | |
| assert!(deque.is_empty()); | |
| assert_eq!(deque.len(), 0); | |
| deque.push_back(1); | |
| deque.push_back(2); | |
| assert!(!deque.is_empty()); | |
| assert_eq!(deque.len(), 2); | |
| assert_eq!(deque.pop_front(), Some(1)); | |
| assert_eq!(deque.pop_back(), Some(2)); | |
| assert!(deque.is_empty()); | |
| } | |
| #[test] | |
| fn test_threaded_deque_test_front_clone() { | |
| let deque = ThreadedDeque::new(); | |
| deque.push_back(42); | |
| let front = deque.front(); | |
| assert_eq!(front, Some(42)); | |
| // The deque itself is unchanged | |
| assert_eq!(deque.len(), 1); | |
| assert_eq!(deque.pop_front(), Some(42)); | |
| } | |
| #[test] | |
| fn test_threaded_deque_clone_shares_state() { | |
| let deque1 = ThreadedDeque::new(); | |
| let deque2 = deque1.clone(); | |
| deque1.push_back(10); | |
| // Both see the same element | |
| assert_eq!(deque2.front(), Some(10)); | |
| // Popping from one affects the other | |
| assert_eq!(deque2.pop_front(), Some(10)); | |
| assert!(deque1.is_empty()); | |
| } | |
| #[test] | |
| fn test_threaded_deque_concurrent_access() { | |
| let deque = ThreadedDeque::new(); | |
| let handles: Vec<_> = (0..10) | |
| .map(|i| { | |
| let dq = deque.clone(); | |
| thread::spawn(move || { | |
| dq.push_back(i); | |
| }) | |
| }) | |
| .collect(); | |
| for h in handles { | |
| h.join().unwrap(); | |
| } | |
| assert_eq!(deque.len(), 10); | |
| // Pop all values concurrently | |
| let handles: Vec<_> = (0..10) | |
| .map(|_| { | |
| let dq = deque.clone(); | |
| thread::spawn(move || dq.pop_front()) | |
| }) | |
| .collect(); | |
| let mut results = vec![]; | |
| for h in handles { | |
| if let Some(v) = h.join().unwrap() { | |
| results.push(v); | |
| } | |
| } | |
| results.sort(); | |
| assert_eq!(results, (0..10).collect::<Vec<_>>()); | |
| assert!(deque.is_empty()); | |
| } | |
| #[test] | |
| fn test_threaded_deque_empty_pop_front_back() { | |
| let deque: ThreadedDeque<i32> = ThreadedDeque::new(); | |
| assert_eq!(deque.pop_front(), None); | |
| assert_eq!(deque.pop_back(), None); | |
| assert_eq!(deque.front(), None); | |
| } | |
| #[test] | |
| fn test_threaded_deque_len_is_empty_consistency() { | |
| let deque = ThreadedDeque::new(); | |
| assert!(deque.is_empty()); | |
| assert_eq!(deque.len(), 0); | |
| deque.push_back(1); | |
| assert!(!deque.is_empty()); | |
| assert_eq!(deque.len(), 1); | |
| deque.pop_front(); | |
| assert!(deque.is_empty()); | |
| assert_eq!(deque.len(), 0); | |
| } | |
| #[test] | |
| fn test_threaded_deque_front_pop_and_front_race() { | |
| // Imagine you have one thread that loops and just calls pop_front. | |
| // You have anything thread that needs to check something is at front, | |
| // getting a reference to it, then uses that ref in some operation. | |
| // Suppose that operation could fail, which is why they only call | |
| // pop_front after 'that operation' was successful with the cloned front. | |
| // When they call pop_front they won't be popping what they think they are. | |
| // Due to the other thread constantly popping front, there's a race. | |
| // This test tests for that race. | |
| run_test_n_times(100, 0, false, || { | |
| let num_elements = 1000; | |
| let deque = ThreadedDeque::new(); | |
| for i in 0..num_elements { | |
| deque.push_back(i); | |
| } | |
| // thread that checks front, then if it is Some, pops front. | |
| // In between checking the front, and popping the front, it is | |
| // possible that another thread could have popped prior... | |
| let check_front_deque = deque.clone(); | |
| let checker_thread = thread::spawn(move || { | |
| // To combat this, acquire the lock. | |
| let mut deque_guard = check_front_deque.lock(); | |
| while !deque_guard.is_empty() { | |
| if let Some(got) = deque_guard.front().cloned() { | |
| let popped = deque_guard.pop_front(); | |
| assert_eq!(got, popped.unwrap()); | |
| } | |
| } | |
| }); | |
| let pop_deque = deque.clone(); | |
| let popper_thread = thread::spawn(move || { | |
| while !pop_deque.is_empty() { | |
| let _ = pop_deque.pop_front(); | |
| } | |
| }); | |
| checker_thread.join().unwrap(); | |
| popper_thread.join().unwrap(); | |
| }); | |
| } | |
| #[test] | |
| #[should_panic] | |
| fn test_threaded_deque_front_pop_front_race_should_panic() { | |
| // Imagine you have one thread that loops and just calls pop_front. | |
| // You have anything thread that needs to check something is at front, | |
| // getting a reference to it, then uses that ref in some operation. | |
| // Suppose that operation could fail, which is why they only call | |
| // pop_front after 'that operation' was successful with the cloned front. | |
| // When they call pop_front they won't be popping what they think they are. | |
| // Due to the other thread constantly popping front, there's a race. | |
| // This test tests for that race. | |
| run_test_n_times(10, 0, false, || { | |
| let num_elements = 1000; | |
| let deque = ThreadedDeque::new(); | |
| for i in 0..num_elements { | |
| deque.push_back(i); | |
| } | |
| // thread that checks front, then if it is Some, pops front. | |
| // In between checking the front, and popping the front, it is | |
| // possible that another thread could have popped prior... | |
| let check_front_deque = deque.clone(); | |
| let checker_thread = thread::spawn(move || { | |
| while !check_front_deque.is_empty() { | |
| if let Some(got) = check_front_deque.front() { | |
| let popped = check_front_deque.pop_front(); | |
| //assert_eq!(got, popped.unwrap()); | |
| if got != popped.unwrap() { | |
| panic!("race detected!!!"); | |
| } | |
| } | |
| } | |
| }); | |
| let pop_deque = deque.clone(); | |
| let popper_thread = thread::spawn(move || { | |
| while !pop_deque.is_empty() { | |
| let _ = pop_deque.pop_front(); | |
| } | |
| }); | |
| checker_thread.join().unwrap(); | |
| popper_thread.join().unwrap(); | |
| }); | |
| } | |
| #[test] | |
| fn test_threaded_deque_lock() { | |
| let deque = ThreadedDeque::new(); | |
| let element = 1; | |
| deque.push_back(element); | |
| let mut lock = deque.lock(); | |
| let popped = lock.pop_front(); | |
| drop(lock); | |
| assert!(deque.is_empty()); | |
| assert_eq!(popped.unwrap(), element); | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment