Skip to content

Instantly share code, notes, and snippets.

@matthewoestreich
Last active November 10, 2025 17:36
Show Gist options
  • Save matthewoestreich/a62e93dff414512f5db41b210df0f088 to your computer and use it in GitHub Desktop.
Save matthewoestreich/a62e93dff414512f5db41b210df0f088 to your computer and use it in GitHub Desktop.
Thread safe VecDeque
pub mod threadsafe {
use std::{
collections::VecDeque,
sync::{Arc, Mutex, MutexGuard},
};
// One-liner that allows us to easily lock a Mutex while handling possible poison.
fn safe_lock<T>(m: &Mutex<T>) -> MutexGuard<'_, T> {
match m.lock() {
Ok(guard) => guard,
Err(poisoned) => poisoned.into_inner(),
}
}
/// ThreadedDeque is a threadsafe VecDeque.
pub struct ThreadedDeque<T> {
inner: Arc<Mutex<VecDeque<T>>>,
}
impl<T> Clone for ThreadedDeque<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> Default for ThreadedDeque<T> {
fn default() -> Self {
Self {
inner: Arc::new(Mutex::new(VecDeque::new())),
}
}
}
#[allow(dead_code)]
impl<T> ThreadedDeque<T> {
pub fn new() -> Self {
Self {
inner: Arc::new(Mutex::new(VecDeque::new())),
}
}
/// If you want to get the MutexGuard for the inner VecDeque<T>
/// This is not needed to use any methods!
pub fn lock(&self) -> MutexGuard<'_, VecDeque<T>> {
safe_lock(&self.inner)
}
#[inline]
pub fn len(&self) -> usize {
safe_lock(&self.inner).len()
}
#[inline]
pub fn is_empty(&self) -> bool {
safe_lock(&self.inner).is_empty()
}
#[inline]
pub fn push_back(&self, value: T) {
safe_lock(&self.inner).push_back(value);
}
#[inline]
pub fn pop_back(&self) -> Option<T> {
safe_lock(&self.inner).pop_back()
}
#[inline]
pub fn pop_front(&self) -> Option<T> {
safe_lock(&self.inner).pop_front()
}
/// Returns a clone of the front. **DOES NOT MODIFY THE UNDERLYING VecDeque**
#[inline]
pub fn front(&self) -> Option<T>
where
T: Clone,
{
safe_lock(&self.inner).front().cloned()
}
}
#[cfg(test)]
mod tests {
use std::{
panic::{self, RefUnwindSafe, UnwindSafe},
sync::{Arc, mpsc},
thread,
};
use super::ThreadedDeque;
// Runs a test `n_times` in a row.
// failure_threshold : If this many runs fail this test willl fail. If 'failure_threshold' = 2, if 3 jobs fail, this job fails.
fn run_test_n_times<F>(
n_times: usize,
failure_threshold: usize,
log_job_info: bool,
test_fn: F,
) where
F: FnOnce() + Send + Sync + Clone + Copy + UnwindSafe + RefUnwindSafe + 'static,
{
let mut failed_iterations: Vec<(usize, String)> = Vec::new();
let thread_safe_test_fn = Arc::new(test_fn);
for i in 0..n_times {
if log_job_info {
println!("\n--------------------- JOB {i} ---------------------");
}
let thread_fn = Arc::clone(&thread_safe_test_fn);
let (release_tx, release_rx) = mpsc::sync_channel(0);
let release_sender = release_tx.clone();
let handle = thread::spawn(move || {
let result = panic::catch_unwind(|| {
thread_fn();
});
let _ = release_sender.send(result);
});
let result = release_rx.recv().unwrap();
let _ = handle.join();
if let Err(e) = result {
let msg = if let Some(s) = e.downcast_ref::<&str>() {
s.to_string()
} else if let Some(s) = e.downcast_ref::<String>() {
s.clone()
} else {
"-".to_string()
};
failed_iterations.push((i, msg));
}
}
if log_job_info && !failed_iterations.is_empty() {
println!(
"\n\n------------------------------------------ Failed Iterations ------------------------------------------\n\n{:#?}\n\n-------------------------------------------------------------------------------------------------------",
failed_iterations
);
}
assert!(
if failure_threshold == 0 {
failed_iterations.is_empty()
} else {
failed_iterations.len() <= failure_threshold
},
"expected this to fail at most {failure_threshold} times, instead it failed {}/{}",
failed_iterations.len(),
n_times
);
}
#[test]
fn test_threaded_deque_push_pop_basic() {
let deque = ThreadedDeque::new();
assert!(deque.is_empty());
assert_eq!(deque.len(), 0);
deque.push_back(1);
deque.push_back(2);
assert!(!deque.is_empty());
assert_eq!(deque.len(), 2);
assert_eq!(deque.pop_front(), Some(1));
assert_eq!(deque.pop_back(), Some(2));
assert!(deque.is_empty());
}
#[test]
fn test_threaded_deque_test_front_clone() {
let deque = ThreadedDeque::new();
deque.push_back(42);
let front = deque.front();
assert_eq!(front, Some(42));
// The deque itself is unchanged
assert_eq!(deque.len(), 1);
assert_eq!(deque.pop_front(), Some(42));
}
#[test]
fn test_threaded_deque_clone_shares_state() {
let deque1 = ThreadedDeque::new();
let deque2 = deque1.clone();
deque1.push_back(10);
// Both see the same element
assert_eq!(deque2.front(), Some(10));
// Popping from one affects the other
assert_eq!(deque2.pop_front(), Some(10));
assert!(deque1.is_empty());
}
#[test]
fn test_threaded_deque_concurrent_access() {
let deque = ThreadedDeque::new();
let handles: Vec<_> = (0..10)
.map(|i| {
let dq = deque.clone();
thread::spawn(move || {
dq.push_back(i);
})
})
.collect();
for h in handles {
h.join().unwrap();
}
assert_eq!(deque.len(), 10);
// Pop all values concurrently
let handles: Vec<_> = (0..10)
.map(|_| {
let dq = deque.clone();
thread::spawn(move || dq.pop_front())
})
.collect();
let mut results = vec![];
for h in handles {
if let Some(v) = h.join().unwrap() {
results.push(v);
}
}
results.sort();
assert_eq!(results, (0..10).collect::<Vec<_>>());
assert!(deque.is_empty());
}
#[test]
fn test_threaded_deque_empty_pop_front_back() {
let deque: ThreadedDeque<i32> = ThreadedDeque::new();
assert_eq!(deque.pop_front(), None);
assert_eq!(deque.pop_back(), None);
assert_eq!(deque.front(), None);
}
#[test]
fn test_threaded_deque_len_is_empty_consistency() {
let deque = ThreadedDeque::new();
assert!(deque.is_empty());
assert_eq!(deque.len(), 0);
deque.push_back(1);
assert!(!deque.is_empty());
assert_eq!(deque.len(), 1);
deque.pop_front();
assert!(deque.is_empty());
assert_eq!(deque.len(), 0);
}
#[test]
fn test_threaded_deque_front_pop_and_front_race() {
// Imagine you have one thread that loops and just calls pop_front.
// You have anything thread that needs to check something is at front,
// getting a reference to it, then uses that ref in some operation.
// Suppose that operation could fail, which is why they only call
// pop_front after 'that operation' was successful with the cloned front.
// When they call pop_front they won't be popping what they think they are.
// Due to the other thread constantly popping front, there's a race.
// This test tests for that race.
run_test_n_times(100, 0, false, || {
let num_elements = 1000;
let deque = ThreadedDeque::new();
for i in 0..num_elements {
deque.push_back(i);
}
// thread that checks front, then if it is Some, pops front.
// In between checking the front, and popping the front, it is
// possible that another thread could have popped prior...
let check_front_deque = deque.clone();
let checker_thread = thread::spawn(move || {
// To combat this, acquire the lock.
let mut deque_guard = check_front_deque.lock();
while !deque_guard.is_empty() {
if let Some(got) = deque_guard.front().cloned() {
let popped = deque_guard.pop_front();
assert_eq!(got, popped.unwrap());
}
}
});
let pop_deque = deque.clone();
let popper_thread = thread::spawn(move || {
while !pop_deque.is_empty() {
let _ = pop_deque.pop_front();
}
});
checker_thread.join().unwrap();
popper_thread.join().unwrap();
});
}
#[test]
#[should_panic]
fn test_threaded_deque_front_pop_front_race_should_panic() {
// Imagine you have one thread that loops and just calls pop_front.
// You have anything thread that needs to check something is at front,
// getting a reference to it, then uses that ref in some operation.
// Suppose that operation could fail, which is why they only call
// pop_front after 'that operation' was successful with the cloned front.
// When they call pop_front they won't be popping what they think they are.
// Due to the other thread constantly popping front, there's a race.
// This test tests for that race.
run_test_n_times(10, 0, false, || {
let num_elements = 1000;
let deque = ThreadedDeque::new();
for i in 0..num_elements {
deque.push_back(i);
}
// thread that checks front, then if it is Some, pops front.
// In between checking the front, and popping the front, it is
// possible that another thread could have popped prior...
let check_front_deque = deque.clone();
let checker_thread = thread::spawn(move || {
while !check_front_deque.is_empty() {
if let Some(got) = check_front_deque.front() {
let popped = check_front_deque.pop_front();
//assert_eq!(got, popped.unwrap());
if got != popped.unwrap() {
panic!("race detected!!!");
}
}
}
});
let pop_deque = deque.clone();
let popper_thread = thread::spawn(move || {
while !pop_deque.is_empty() {
let _ = pop_deque.pop_front();
}
});
checker_thread.join().unwrap();
popper_thread.join().unwrap();
});
}
#[test]
fn test_threaded_deque_lock() {
let deque = ThreadedDeque::new();
let element = 1;
deque.push_back(element);
let mut lock = deque.lock();
let popped = lock.pop_front();
drop(lock);
assert!(deque.is_empty());
assert_eq!(popped.unwrap(), element);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment