Created
August 12, 2023 07:45
-
-
Save trvswgnr/21f6747f52631ca0a520c3a9f4b422f3 to your computer and use it in GitHub Desktop.
a concurrent fifo queue with a custom doubly-linked list in crablang
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use crate::linkedlist::LinkedList; | |
use std::sync::{Arc, RwLock}; | |
pub struct Queue<T> { | |
queue: RwLock<LinkedList<T>>, | |
} | |
impl<T> Queue<T> { | |
pub fn new() -> Arc<Self> { | |
Arc::new(Queue { | |
queue: RwLock::new(LinkedList::new()), | |
}) | |
} | |
pub fn enqueue(&self, item: T) { | |
let mut queue = self.queue.write().unwrap(); | |
queue.push_back(item); | |
} | |
pub fn dequeue(&self) -> Option<T> { | |
let mut queue = self.queue.write().unwrap(); | |
queue.pop_front() | |
} | |
pub fn is_empty(&self) -> bool { | |
let queue = self.queue.read().unwrap(); | |
queue.is_empty() | |
} | |
pub fn size(&self) -> usize { | |
let queue = self.queue.read().unwrap(); | |
queue.len() | |
} | |
pub fn clear(&self) { | |
let mut queue = self.queue.write().unwrap(); | |
queue.clear(); | |
} | |
pub fn peek(&self) -> Option<T> { | |
let queue = self.queue.read().unwrap(); | |
unsafe { queue.peek_front().map(|item| std::ptr::read(item)) } | |
} | |
} | |
#[cfg(test)] | |
mod method_tests { | |
use super::*; | |
use std::thread; | |
#[test] | |
fn test_queue_new() { | |
let queue = Queue::<i32>::new(); | |
assert!(queue.is_empty()); | |
assert_eq!(queue.size(), 0); | |
} | |
#[test] | |
fn test_queue_enqueue() { | |
let queue = Queue::new(); | |
queue.enqueue(1); | |
assert_eq!(queue.size(), 1); | |
} | |
#[test] | |
fn test_queue_dequeue() { | |
let queue = Queue::new(); | |
queue.enqueue(1); | |
assert_eq!(queue.dequeue(), Some(1)); | |
assert_eq!(queue.size(), 0); | |
assert_eq!(queue.dequeue(), None); | |
} | |
#[test] | |
fn test_queue_is_empty() { | |
let queue = Queue::new(); | |
assert!(queue.is_empty()); | |
queue.enqueue(1); | |
assert!(!queue.is_empty()); | |
} | |
#[test] | |
fn test_queue_size() { | |
let queue = Queue::new(); | |
assert_eq!(queue.size(), 0); | |
queue.enqueue(1); | |
assert_eq!(queue.size(), 1); | |
} | |
#[test] | |
fn test_queue_clear() { | |
let queue = Queue::new(); | |
queue.enqueue(1); | |
queue.enqueue(2); | |
queue.enqueue(3); | |
assert_eq!(queue.size(), 3); | |
queue.clear(); | |
assert!(queue.is_empty()); | |
} | |
#[test] | |
fn test_queue_peek() { | |
let queue = Queue::new(); | |
queue.enqueue(1); | |
queue.enqueue(2); | |
queue.enqueue(3); | |
assert_eq!(queue.peek(), Some(1)); | |
assert_eq!(queue.size(), 3); | |
} | |
#[test] | |
fn test_queue_all_methods_same_time() { | |
let queue = Queue::new(); | |
let mut handles = vec![]; | |
for _ in 0..10 { | |
let queue_clone = queue.clone(); | |
let handle_1 = thread::spawn(move || { | |
for i in 0..1000 { | |
// wait for 1-10 ms | |
thread::sleep(std::time::Duration::from_millis( | |
rand::random::<u64>() % 10 + 1, | |
)); | |
queue_clone.enqueue(i); | |
} | |
}); | |
let queue_clone = queue.clone(); | |
let handle_2 = thread::spawn(move || { | |
for _ in 0..999 { | |
// wait for 1-10 ms | |
thread::sleep(std::time::Duration::from_millis( | |
rand::random::<u64>() % 10 + 1, | |
)); | |
queue_clone.dequeue(); | |
} | |
}); | |
let queue_clone = queue.clone(); | |
let handle_3 = thread::spawn(move || { | |
// wait for 1-10 ms | |
thread::sleep(std::time::Duration::from_millis( | |
rand::random::<u64>() % 10 + 1, | |
)); | |
for _ in 0..1000 { | |
queue_clone.is_empty(); | |
} | |
}); | |
let queue_clone = queue.clone(); | |
let handle_4 = thread::spawn(move || { | |
for _ in 0..1000 { | |
// wait for 1-10 ms | |
thread::sleep(std::time::Duration::from_millis( | |
rand::random::<u64>() % 10 + 1, | |
)); | |
queue_clone.size(); | |
} | |
}); | |
let queue_clone = queue.clone(); | |
let handle_5 = thread::spawn(move || { | |
for _ in 0..1000 { | |
// wait for 1-10 ms | |
thread::sleep(std::time::Duration::from_millis( | |
rand::random::<u64>() % 10 + 1, | |
)); | |
queue_clone.peek(); | |
} | |
}); | |
handles.push(handle_1); | |
handles.push(handle_2); | |
handles.push(handle_3); | |
handles.push(handle_4); | |
handles.push(handle_5); | |
} | |
for handle in handles { | |
handle.join().unwrap(); | |
} | |
// the queue should not be empty, because we have | |
// enqueued 1000 items and dequeued 999 items | |
assert!(!queue.is_empty()); | |
} | |
} | |
#[cfg(test)] | |
mod basic_tests { | |
use super::*; | |
use std::thread; | |
#[test] | |
fn test_queue_with_str_slice_data_type() { | |
let queue = Queue::new(); | |
queue.enqueue("hello"); | |
queue.enqueue("darkness"); | |
queue.enqueue("my"); | |
queue.enqueue("old"); | |
queue.enqueue("friend"); | |
assert_eq!(queue.size(), 5); | |
assert_eq!(queue.dequeue(), Some("hello")); | |
assert_eq!(queue.dequeue(), Some("darkness")); | |
assert_eq!(queue.dequeue(), Some("my")); | |
assert_eq!(queue.dequeue(), Some("old")); | |
assert_eq!(queue.dequeue(), Some("friend")); | |
assert!(queue.is_empty()); | |
} | |
#[test] | |
fn test_queue_with_struct_data_type() { | |
#[derive(Debug, PartialEq)] | |
struct Person { | |
name: String, | |
age: u8, | |
} | |
let queue = Queue::new(); | |
queue.enqueue(Person { | |
name: "John".to_string(), | |
age: 32, | |
}); | |
queue.enqueue(Person { | |
name: "Jane".to_string(), | |
age: 28, | |
}); | |
assert_eq!(queue.size(), 2); | |
assert_eq!( | |
queue.dequeue(), | |
Some(Person { | |
name: "John".to_string(), | |
age: 32, | |
}) | |
); | |
assert_eq!( | |
queue.dequeue(), | |
Some(Person { | |
name: "Jane".to_string(), | |
age: 28, | |
}) | |
); | |
assert!(queue.is_empty()); | |
} | |
#[test] | |
fn test_concurrent_queue() { | |
let queue = Queue::new(); | |
let mut handles = vec![]; | |
for _ in 0..10 { | |
let queue_clone = queue.clone(); | |
let handle = thread::spawn(move || { | |
for i in 0..1000 { | |
queue_clone.enqueue(i); | |
} | |
}); | |
handles.push(handle); | |
} | |
for handle in handles { | |
handle.join().unwrap(); | |
} | |
assert_eq!(queue.size(), 10000); | |
} | |
#[test] | |
fn test_concurrent_queue_with_dequeue() { | |
let queue = Queue::new(); | |
for i in 0..1000 { | |
queue.enqueue(i); | |
} | |
let mut handles = vec![]; | |
for _ in 0..10 { | |
let queue_clone = queue.clone(); | |
let handle = thread::spawn(move || { | |
for _ in 0..100 { | |
queue_clone.dequeue(); | |
} | |
}); | |
handles.push(handle); | |
} | |
for handle in handles { | |
handle.join().unwrap(); | |
} | |
assert_eq!(queue.size(), 0); | |
} | |
#[test] | |
fn test_queue_under_load() { | |
let queue = Queue::new(); | |
for i in 0..1_000_000 { | |
queue.enqueue(i); | |
} | |
assert_eq!(queue.size(), 1_000_000); | |
for _ in 0..1_000_000 { | |
queue.dequeue(); | |
} | |
assert_eq!(queue.size(), 0); | |
assert!(queue.is_empty()); | |
} | |
#[test] | |
fn test_concurrent_enqueue_dequeue() { | |
let queue = Queue::new(); | |
let num_threads = 100; | |
let num_operations = 1000; | |
let mut handles = vec![]; | |
for _ in 0..num_threads { | |
let queue_clone = queue.clone(); | |
let handle = thread::spawn(move || { | |
for i in 0..num_operations { | |
queue_clone.enqueue(i); | |
queue_clone.dequeue(); | |
} | |
}); | |
handles.push(handle); | |
} | |
for handle in handles { | |
handle.join().unwrap(); | |
} | |
// since each thread enqueues and dequeues the same number of items, | |
// the queue should be empty at the end. | |
assert!(queue.is_empty()); | |
assert_eq!(queue.size(), 0); | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::marker::PhantomData; | |
use std::ptr::NonNull; | |
pub struct LinkedList<T> { | |
head: Option<NonNull<Node<T>>>, | |
tail: Option<NonNull<Node<T>>>, | |
len: usize, | |
marker: PhantomData<Box<Node<T>>>, | |
} | |
pub struct Node<T> { | |
next: Option<NonNull<Node<T>>>, | |
prev: Option<NonNull<Node<T>>>, | |
element: T, | |
} | |
pub struct Iter<'a, T> { | |
next: Option<NonNull<Node<T>>>, | |
marker: PhantomData<&'a Node<T>>, | |
} | |
pub struct IterMut<'a, T> { | |
next: Option<NonNull<Node<T>>>, | |
marker: PhantomData<&'a mut Node<T>>, | |
} | |
pub struct IntoIter<T> { | |
next: Option<NonNull<Node<T>>>, | |
} | |
impl<T> LinkedList<T> { | |
pub fn new() -> Self { | |
LinkedList { | |
head: None, | |
tail: None, | |
len: 0, | |
marker: PhantomData, | |
} | |
} | |
pub fn push_front(&mut self, element: T) { | |
let new_node = Box::new(Node { | |
next: None, | |
prev: None, | |
element, | |
}); | |
unsafe { | |
let mut new_node = NonNull::new_unchecked(Box::into_raw(new_node)); | |
match self.head { | |
None => self.tail = Some(new_node), | |
Some(mut old_head) => { | |
old_head.as_mut().prev = Some(new_node); | |
new_node.as_mut().next = Some(old_head); | |
} | |
} | |
self.head = Some(new_node); | |
self.len += 1; | |
} | |
} | |
pub fn push_back(&mut self, element: T) { | |
let new_node = Box::new(Node { | |
next: None, | |
prev: None, | |
element, | |
}); | |
unsafe { | |
let mut new_node = NonNull::new_unchecked(Box::into_raw(new_node)); | |
match self.tail { | |
None => self.head = Some(new_node), | |
Some(mut old_tail) => { | |
old_tail.as_mut().next = Some(new_node); | |
new_node.as_mut().prev = Some(old_tail); | |
} | |
} | |
self.tail = Some(new_node); | |
self.len += 1; | |
} | |
} | |
pub fn pop_front(&mut self) -> Option<T> { | |
self.head.map(|old_head| unsafe { | |
let old_head = Box::from_raw(old_head.as_ptr()); | |
self.head = old_head.next; | |
if let Some(mut head) = self.head { | |
head.as_mut().prev = None; | |
} else { | |
self.tail = None; | |
} | |
self.len -= 1; | |
old_head.element | |
}) | |
} | |
pub fn pop_back(&mut self) -> Option<T> { | |
self.tail.map(|old_tail| unsafe { | |
let old_tail = Box::from_raw(old_tail.as_ptr()); | |
self.tail = old_tail.prev; | |
if let Some(mut tail) = self.tail { | |
tail.as_mut().next = None; | |
} else { | |
self.head = None; | |
} | |
self.len -= 1; | |
old_tail.element | |
}) | |
} | |
pub fn peek_front(&self) -> Option<&T> { | |
self.head.map(|head| unsafe { &head.as_ref().element }) | |
} | |
pub fn peek_front_mut(&mut self) -> Option<&mut T> { | |
self.head | |
.map(|mut head| unsafe { &mut head.as_mut().element }) | |
} | |
pub fn peek_back(&self) -> Option<&T> { | |
self.tail.map(|tail| unsafe { &tail.as_ref().element }) | |
} | |
pub fn peek_back_mut(&mut self) -> Option<&mut T> { | |
self.tail | |
.map(|mut tail| unsafe { &mut tail.as_mut().element }) | |
} | |
pub fn iter(&self) -> Iter<'_, T> { | |
Iter { | |
next: self.head, | |
marker: PhantomData, | |
} | |
} | |
pub fn iter_mut(&mut self) -> IterMut<'_, T> { | |
IterMut { | |
next: self.head, | |
marker: PhantomData, | |
} | |
} | |
pub fn len(&self) -> usize { | |
self.len | |
} | |
pub fn is_empty(&self) -> bool { | |
self.len == 0 | |
} | |
pub fn clear(&mut self) { | |
*self = Self::new(); | |
} | |
} | |
impl<T> Drop for LinkedList<T> { | |
fn drop(&mut self) { | |
while self.pop_front().is_some() {} | |
} | |
} | |
impl<'a, T> Iterator for Iter<'a, T> { | |
type Item = &'a T; | |
fn next(&mut self) -> Option<Self::Item> { | |
self.next.map(|node| unsafe { | |
let node = &*node.as_ptr(); | |
self.next = node.next; | |
&node.element | |
}) | |
} | |
} | |
impl<'a, T> Iterator for IterMut<'a, T> { | |
type Item = &'a mut T; | |
fn next(&mut self) -> Option<Self::Item> { | |
self.next.map(|node| unsafe { | |
let node = &mut *node.as_ptr(); | |
self.next = node.next; | |
&mut node.element | |
}) | |
} | |
} | |
impl<T> IntoIterator for LinkedList<T> { | |
type Item = T; | |
type IntoIter = IntoIter<T>; | |
fn into_iter(mut self) -> Self::IntoIter { | |
let next = self.head.take(); | |
IntoIter { next } | |
} | |
} | |
impl<T> Iterator for IntoIter<T> { | |
type Item = T; | |
fn next(&mut self) -> Option<Self::Item> { | |
self.next.map(|node| unsafe { | |
let node = Box::from_raw(node.as_ptr()); | |
self.next = node.next; | |
node.element | |
}) | |
} | |
} | |
unsafe impl<T: Send> Send for LinkedList<T> {} | |
unsafe impl<T: Sync> Sync for LinkedList<T> {} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
#[test] | |
fn test_new() { | |
let list: LinkedList<i32> = LinkedList::new(); | |
assert!(list.is_empty()); | |
assert_eq!(list.len(), 0); | |
} | |
#[test] | |
fn test_push_front() { | |
let mut list = LinkedList::new(); | |
list.push_front(1); | |
list.push_front(2); | |
list.push_front(3); | |
assert_eq!(list.len(), 3); | |
assert_eq!(list.pop_front(), Some(3)); | |
assert_eq!(list.pop_front(), Some(2)); | |
assert_eq!(list.pop_front(), Some(1)); | |
assert_eq!(list.pop_front(), None); | |
assert!(list.is_empty()); | |
} | |
#[test] | |
fn test_push_back() { | |
let mut list = LinkedList::new(); | |
list.push_back(1); | |
list.push_back(2); | |
list.push_back(3); | |
assert_eq!(list.len(), 3); | |
assert_eq!(list.pop_front(), Some(1)); | |
assert_eq!(list.pop_front(), Some(2)); | |
assert_eq!(list.pop_front(), Some(3)); | |
assert_eq!(list.pop_front(), None); | |
assert!(list.is_empty()); | |
} | |
#[test] | |
fn test_pop_front() { | |
let mut list = LinkedList::new(); | |
list.push_back(1); | |
list.push_back(2); | |
list.push_back(3); | |
assert_eq!(list.pop_front(), Some(1)); | |
assert_eq!(list.pop_front(), Some(2)); | |
assert_eq!(list.pop_front(), Some(3)); | |
assert_eq!(list.pop_front(), None); | |
assert!(list.is_empty()); | |
} | |
#[test] | |
fn test_pop_back() { | |
let mut list = LinkedList::new(); | |
list.push_back(1); | |
list.push_back(2); | |
list.push_back(3); | |
assert_eq!(list.pop_back(), Some(3)); | |
assert_eq!(list.pop_back(), Some(2)); | |
assert_eq!(list.pop_back(), Some(1)); | |
assert_eq!(list.pop_back(), None); | |
assert!(list.is_empty()); | |
} | |
#[test] | |
fn test_iter() { | |
let mut list = LinkedList::new(); | |
list.push_back(1); | |
list.push_back(2); | |
list.push_back(3); | |
let mut iter = list.iter(); | |
assert_eq!(iter.next(), Some(&1)); | |
assert_eq!(iter.next(), Some(&2)); | |
assert_eq!(iter.next(), Some(&3)); | |
assert_eq!(iter.next(), None); | |
} | |
#[test] | |
fn test_into_iter() { | |
let mut list = LinkedList::new(); | |
list.push_back(1); | |
list.push_back(2); | |
list.push_back(3); | |
let mut iter = list.into_iter(); | |
assert_eq!(iter.next(), Some(1)); | |
assert_eq!(iter.next(), Some(2)); | |
assert_eq!(iter.next(), Some(3)); | |
assert_eq!(iter.next(), None); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment