Skip to content

Instantly share code, notes, and snippets.

@trvswgnr
Created August 12, 2023 07:45
Show Gist options
  • Save trvswgnr/21f6747f52631ca0a520c3a9f4b422f3 to your computer and use it in GitHub Desktop.
Save trvswgnr/21f6747f52631ca0a520c3a9f4b422f3 to your computer and use it in GitHub Desktop.
a concurrent fifo queue with a custom doubly-linked list in crablang
use crate::linkedlist::LinkedList;
use std::sync::{Arc, RwLock};
pub struct Queue<T> {
queue: RwLock<LinkedList<T>>,
}
impl<T> Queue<T> {
pub fn new() -> Arc<Self> {
Arc::new(Queue {
queue: RwLock::new(LinkedList::new()),
})
}
pub fn enqueue(&self, item: T) {
let mut queue = self.queue.write().unwrap();
queue.push_back(item);
}
pub fn dequeue(&self) -> Option<T> {
let mut queue = self.queue.write().unwrap();
queue.pop_front()
}
pub fn is_empty(&self) -> bool {
let queue = self.queue.read().unwrap();
queue.is_empty()
}
pub fn size(&self) -> usize {
let queue = self.queue.read().unwrap();
queue.len()
}
pub fn clear(&self) {
let mut queue = self.queue.write().unwrap();
queue.clear();
}
pub fn peek(&self) -> Option<T> {
let queue = self.queue.read().unwrap();
unsafe { queue.peek_front().map(|item| std::ptr::read(item)) }
}
}
#[cfg(test)]
mod method_tests {
use super::*;
use std::thread;
#[test]
fn test_queue_new() {
let queue = Queue::<i32>::new();
assert!(queue.is_empty());
assert_eq!(queue.size(), 0);
}
#[test]
fn test_queue_enqueue() {
let queue = Queue::new();
queue.enqueue(1);
assert_eq!(queue.size(), 1);
}
#[test]
fn test_queue_dequeue() {
let queue = Queue::new();
queue.enqueue(1);
assert_eq!(queue.dequeue(), Some(1));
assert_eq!(queue.size(), 0);
assert_eq!(queue.dequeue(), None);
}
#[test]
fn test_queue_is_empty() {
let queue = Queue::new();
assert!(queue.is_empty());
queue.enqueue(1);
assert!(!queue.is_empty());
}
#[test]
fn test_queue_size() {
let queue = Queue::new();
assert_eq!(queue.size(), 0);
queue.enqueue(1);
assert_eq!(queue.size(), 1);
}
#[test]
fn test_queue_clear() {
let queue = Queue::new();
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
assert_eq!(queue.size(), 3);
queue.clear();
assert!(queue.is_empty());
}
#[test]
fn test_queue_peek() {
let queue = Queue::new();
queue.enqueue(1);
queue.enqueue(2);
queue.enqueue(3);
assert_eq!(queue.peek(), Some(1));
assert_eq!(queue.size(), 3);
}
#[test]
fn test_queue_all_methods_same_time() {
let queue = Queue::new();
let mut handles = vec![];
for _ in 0..10 {
let queue_clone = queue.clone();
let handle_1 = thread::spawn(move || {
for i in 0..1000 {
// wait for 1-10 ms
thread::sleep(std::time::Duration::from_millis(
rand::random::<u64>() % 10 + 1,
));
queue_clone.enqueue(i);
}
});
let queue_clone = queue.clone();
let handle_2 = thread::spawn(move || {
for _ in 0..999 {
// wait for 1-10 ms
thread::sleep(std::time::Duration::from_millis(
rand::random::<u64>() % 10 + 1,
));
queue_clone.dequeue();
}
});
let queue_clone = queue.clone();
let handle_3 = thread::spawn(move || {
// wait for 1-10 ms
thread::sleep(std::time::Duration::from_millis(
rand::random::<u64>() % 10 + 1,
));
for _ in 0..1000 {
queue_clone.is_empty();
}
});
let queue_clone = queue.clone();
let handle_4 = thread::spawn(move || {
for _ in 0..1000 {
// wait for 1-10 ms
thread::sleep(std::time::Duration::from_millis(
rand::random::<u64>() % 10 + 1,
));
queue_clone.size();
}
});
let queue_clone = queue.clone();
let handle_5 = thread::spawn(move || {
for _ in 0..1000 {
// wait for 1-10 ms
thread::sleep(std::time::Duration::from_millis(
rand::random::<u64>() % 10 + 1,
));
queue_clone.peek();
}
});
handles.push(handle_1);
handles.push(handle_2);
handles.push(handle_3);
handles.push(handle_4);
handles.push(handle_5);
}
for handle in handles {
handle.join().unwrap();
}
// the queue should not be empty, because we have
// enqueued 1000 items and dequeued 999 items
assert!(!queue.is_empty());
}
}
#[cfg(test)]
mod basic_tests {
use super::*;
use std::thread;
#[test]
fn test_queue_with_str_slice_data_type() {
let queue = Queue::new();
queue.enqueue("hello");
queue.enqueue("darkness");
queue.enqueue("my");
queue.enqueue("old");
queue.enqueue("friend");
assert_eq!(queue.size(), 5);
assert_eq!(queue.dequeue(), Some("hello"));
assert_eq!(queue.dequeue(), Some("darkness"));
assert_eq!(queue.dequeue(), Some("my"));
assert_eq!(queue.dequeue(), Some("old"));
assert_eq!(queue.dequeue(), Some("friend"));
assert!(queue.is_empty());
}
#[test]
fn test_queue_with_struct_data_type() {
#[derive(Debug, PartialEq)]
struct Person {
name: String,
age: u8,
}
let queue = Queue::new();
queue.enqueue(Person {
name: "John".to_string(),
age: 32,
});
queue.enqueue(Person {
name: "Jane".to_string(),
age: 28,
});
assert_eq!(queue.size(), 2);
assert_eq!(
queue.dequeue(),
Some(Person {
name: "John".to_string(),
age: 32,
})
);
assert_eq!(
queue.dequeue(),
Some(Person {
name: "Jane".to_string(),
age: 28,
})
);
assert!(queue.is_empty());
}
#[test]
fn test_concurrent_queue() {
let queue = Queue::new();
let mut handles = vec![];
for _ in 0..10 {
let queue_clone = queue.clone();
let handle = thread::spawn(move || {
for i in 0..1000 {
queue_clone.enqueue(i);
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(queue.size(), 10000);
}
#[test]
fn test_concurrent_queue_with_dequeue() {
let queue = Queue::new();
for i in 0..1000 {
queue.enqueue(i);
}
let mut handles = vec![];
for _ in 0..10 {
let queue_clone = queue.clone();
let handle = thread::spawn(move || {
for _ in 0..100 {
queue_clone.dequeue();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
assert_eq!(queue.size(), 0);
}
#[test]
fn test_queue_under_load() {
let queue = Queue::new();
for i in 0..1_000_000 {
queue.enqueue(i);
}
assert_eq!(queue.size(), 1_000_000);
for _ in 0..1_000_000 {
queue.dequeue();
}
assert_eq!(queue.size(), 0);
assert!(queue.is_empty());
}
#[test]
fn test_concurrent_enqueue_dequeue() {
let queue = Queue::new();
let num_threads = 100;
let num_operations = 1000;
let mut handles = vec![];
for _ in 0..num_threads {
let queue_clone = queue.clone();
let handle = thread::spawn(move || {
for i in 0..num_operations {
queue_clone.enqueue(i);
queue_clone.dequeue();
}
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
// since each thread enqueues and dequeues the same number of items,
// the queue should be empty at the end.
assert!(queue.is_empty());
assert_eq!(queue.size(), 0);
}
}
use std::marker::PhantomData;
use std::ptr::NonNull;
pub struct LinkedList<T> {
head: Option<NonNull<Node<T>>>,
tail: Option<NonNull<Node<T>>>,
len: usize,
marker: PhantomData<Box<Node<T>>>,
}
pub struct Node<T> {
next: Option<NonNull<Node<T>>>,
prev: Option<NonNull<Node<T>>>,
element: T,
}
pub struct Iter<'a, T> {
next: Option<NonNull<Node<T>>>,
marker: PhantomData<&'a Node<T>>,
}
pub struct IterMut<'a, T> {
next: Option<NonNull<Node<T>>>,
marker: PhantomData<&'a mut Node<T>>,
}
pub struct IntoIter<T> {
next: Option<NonNull<Node<T>>>,
}
impl<T> LinkedList<T> {
pub fn new() -> Self {
LinkedList {
head: None,
tail: None,
len: 0,
marker: PhantomData,
}
}
pub fn push_front(&mut self, element: T) {
let new_node = Box::new(Node {
next: None,
prev: None,
element,
});
unsafe {
let mut new_node = NonNull::new_unchecked(Box::into_raw(new_node));
match self.head {
None => self.tail = Some(new_node),
Some(mut old_head) => {
old_head.as_mut().prev = Some(new_node);
new_node.as_mut().next = Some(old_head);
}
}
self.head = Some(new_node);
self.len += 1;
}
}
pub fn push_back(&mut self, element: T) {
let new_node = Box::new(Node {
next: None,
prev: None,
element,
});
unsafe {
let mut new_node = NonNull::new_unchecked(Box::into_raw(new_node));
match self.tail {
None => self.head = Some(new_node),
Some(mut old_tail) => {
old_tail.as_mut().next = Some(new_node);
new_node.as_mut().prev = Some(old_tail);
}
}
self.tail = Some(new_node);
self.len += 1;
}
}
pub fn pop_front(&mut self) -> Option<T> {
self.head.map(|old_head| unsafe {
let old_head = Box::from_raw(old_head.as_ptr());
self.head = old_head.next;
if let Some(mut head) = self.head {
head.as_mut().prev = None;
} else {
self.tail = None;
}
self.len -= 1;
old_head.element
})
}
pub fn pop_back(&mut self) -> Option<T> {
self.tail.map(|old_tail| unsafe {
let old_tail = Box::from_raw(old_tail.as_ptr());
self.tail = old_tail.prev;
if let Some(mut tail) = self.tail {
tail.as_mut().next = None;
} else {
self.head = None;
}
self.len -= 1;
old_tail.element
})
}
pub fn peek_front(&self) -> Option<&T> {
self.head.map(|head| unsafe { &head.as_ref().element })
}
pub fn peek_front_mut(&mut self) -> Option<&mut T> {
self.head
.map(|mut head| unsafe { &mut head.as_mut().element })
}
pub fn peek_back(&self) -> Option<&T> {
self.tail.map(|tail| unsafe { &tail.as_ref().element })
}
pub fn peek_back_mut(&mut self) -> Option<&mut T> {
self.tail
.map(|mut tail| unsafe { &mut tail.as_mut().element })
}
pub fn iter(&self) -> Iter<'_, T> {
Iter {
next: self.head,
marker: PhantomData,
}
}
pub fn iter_mut(&mut self) -> IterMut<'_, T> {
IterMut {
next: self.head,
marker: PhantomData,
}
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len == 0
}
pub fn clear(&mut self) {
*self = Self::new();
}
}
impl<T> Drop for LinkedList<T> {
fn drop(&mut self) {
while self.pop_front().is_some() {}
}
}
impl<'a, T> Iterator for Iter<'a, T> {
type Item = &'a T;
fn next(&mut self) -> Option<Self::Item> {
self.next.map(|node| unsafe {
let node = &*node.as_ptr();
self.next = node.next;
&node.element
})
}
}
impl<'a, T> Iterator for IterMut<'a, T> {
type Item = &'a mut T;
fn next(&mut self) -> Option<Self::Item> {
self.next.map(|node| unsafe {
let node = &mut *node.as_ptr();
self.next = node.next;
&mut node.element
})
}
}
impl<T> IntoIterator for LinkedList<T> {
type Item = T;
type IntoIter = IntoIter<T>;
fn into_iter(mut self) -> Self::IntoIter {
let next = self.head.take();
IntoIter { next }
}
}
impl<T> Iterator for IntoIter<T> {
type Item = T;
fn next(&mut self) -> Option<Self::Item> {
self.next.map(|node| unsafe {
let node = Box::from_raw(node.as_ptr());
self.next = node.next;
node.element
})
}
}
unsafe impl<T: Send> Send for LinkedList<T> {}
unsafe impl<T: Sync> Sync for LinkedList<T> {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new() {
let list: LinkedList<i32> = LinkedList::new();
assert!(list.is_empty());
assert_eq!(list.len(), 0);
}
#[test]
fn test_push_front() {
let mut list = LinkedList::new();
list.push_front(1);
list.push_front(2);
list.push_front(3);
assert_eq!(list.len(), 3);
assert_eq!(list.pop_front(), Some(3));
assert_eq!(list.pop_front(), Some(2));
assert_eq!(list.pop_front(), Some(1));
assert_eq!(list.pop_front(), None);
assert!(list.is_empty());
}
#[test]
fn test_push_back() {
let mut list = LinkedList::new();
list.push_back(1);
list.push_back(2);
list.push_back(3);
assert_eq!(list.len(), 3);
assert_eq!(list.pop_front(), Some(1));
assert_eq!(list.pop_front(), Some(2));
assert_eq!(list.pop_front(), Some(3));
assert_eq!(list.pop_front(), None);
assert!(list.is_empty());
}
#[test]
fn test_pop_front() {
let mut list = LinkedList::new();
list.push_back(1);
list.push_back(2);
list.push_back(3);
assert_eq!(list.pop_front(), Some(1));
assert_eq!(list.pop_front(), Some(2));
assert_eq!(list.pop_front(), Some(3));
assert_eq!(list.pop_front(), None);
assert!(list.is_empty());
}
#[test]
fn test_pop_back() {
let mut list = LinkedList::new();
list.push_back(1);
list.push_back(2);
list.push_back(3);
assert_eq!(list.pop_back(), Some(3));
assert_eq!(list.pop_back(), Some(2));
assert_eq!(list.pop_back(), Some(1));
assert_eq!(list.pop_back(), None);
assert!(list.is_empty());
}
#[test]
fn test_iter() {
let mut list = LinkedList::new();
list.push_back(1);
list.push_back(2);
list.push_back(3);
let mut iter = list.iter();
assert_eq!(iter.next(), Some(&1));
assert_eq!(iter.next(), Some(&2));
assert_eq!(iter.next(), Some(&3));
assert_eq!(iter.next(), None);
}
#[test]
fn test_into_iter() {
let mut list = LinkedList::new();
list.push_back(1);
list.push_back(2);
list.push_back(3);
let mut iter = list.into_iter();
assert_eq!(iter.next(), Some(1));
assert_eq!(iter.next(), Some(2));
assert_eq!(iter.next(), Some(3));
assert_eq!(iter.next(), None);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment