Last active
December 24, 2015 15:09
-
-
Save toffaletti/6817808 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use std::unstable::sync::UnsafeArc; | |
use std::unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire}; | |
use std::ptr::{mut_null, to_mut_unsafe_ptr}; | |
use std::cast; | |
use std::task; | |
use std::comm; | |
use std::fmt; | |
struct Node<T> { | |
next: AtomicPtr<Node<T>>, | |
value: Option<T>, | |
} | |
impl<T> Node<T> { | |
fn new(value: T) -> Node<T> { | |
Node{next: AtomicPtr::new(mut_null()), value: Some(value)} | |
} | |
} | |
impl<T> Default for Node<T> { | |
fn default() -> Node<T> { | |
Node{next: AtomicPtr::new(mut_null()), value: None} | |
} | |
} | |
struct State<T> { | |
stub: Node<T>, | |
head: AtomicPtr<Node<T>>, | |
tail: *mut Node<T>, | |
} | |
struct Queue<T> { | |
priv state: UnsafeArc<State<T>>, | |
} | |
impl<T: Send> Clone for Queue<T> { | |
fn clone(&self) -> Queue<T> { | |
Queue { | |
state: self.state.clone() | |
} | |
} | |
} | |
impl<T: Send> fmt::Default for Queue<T> { | |
fn fmt(value: &Queue<T>, f: &mut std::fmt::Formatter) { | |
write!(f.buf, "Queue({})", value.state.get()); | |
} | |
} | |
impl<T: Send> Queue<T> { | |
pub fn new() -> Queue<T> { | |
let mut q = Queue{state: UnsafeArc::new(State { | |
stub: Default::default(), | |
head: AtomicPtr::new(mut_null()), | |
tail: mut_null(), | |
})}; | |
let stub = q.get_stub_unsafe(); | |
q.get_head().store(stub, Relaxed); | |
q.set_tail(stub); | |
q | |
} | |
pub fn push(&mut self, value: T) { | |
unsafe { | |
let node = cast::transmute(~Node::new(value)); | |
self.push_node(node); | |
} | |
} | |
fn push_node(&mut self, node: *mut Node<T>) { | |
unsafe { | |
(*node).next.store(mut_null(), Release); | |
let prev = (*self.state.get()).head.swap(node, Relaxed); | |
(*prev).next.store(node, Release); | |
} | |
} | |
fn get_stub_unsafe(&mut self) -> *mut Node<T> { | |
unsafe { to_mut_unsafe_ptr(&mut (*self.state.get()).stub) } | |
} | |
fn get_head(&mut self) -> &mut AtomicPtr<Node<T>> { | |
unsafe { &mut (*self.state.get()).head } | |
} | |
fn get_tail(&mut self) -> *mut Node<T> { | |
unsafe { (*self.state.get()).tail } | |
} | |
fn set_tail(&mut self, tail: *mut Node<T>) { | |
unsafe { | |
(*self.state.get()).tail = tail; | |
} | |
} | |
pub fn pop(&mut self) -> Option<T> { | |
unsafe { | |
let mut tail = self.get_tail(); | |
let mut next = (*tail).next.load(Acquire); | |
let stub = self.get_stub_unsafe(); | |
if tail == stub { | |
if mut_null() == next { | |
return None | |
} | |
self.set_tail(next); | |
tail = next; | |
next = (*next).next.load(Acquire); | |
} | |
if next != mut_null() { | |
let tail: ~Node<T> = cast::transmute(tail); | |
self.set_tail(next); | |
return tail.value | |
} | |
let head = self.get_head().load(Relaxed); | |
if tail != head { | |
return None | |
} | |
self.push_node(stub); | |
next = (*tail).next.load(Acquire); | |
if next != mut_null() { | |
let tail: ~Node<T> = cast::transmute(tail); | |
self.set_tail(next); | |
return tail.value | |
} | |
} | |
None | |
} | |
} | |
#[test] | |
fn test() { | |
let nthreads = 8u; | |
let nmsgs = 1000u; | |
let mut q = Queue::new(); | |
assert_eq!(None, q.pop()); | |
for _ in range(0, nthreads) { | |
let (port, chan) = comm::stream(); | |
chan.send(q.clone()); | |
do task::spawn_sched(task::SingleThreaded) { | |
let mut q = port.recv(); | |
for i in range(0, nmsgs) { | |
q.push(i); | |
} | |
} | |
} | |
let mut i = 0u; | |
loop { | |
match q.pop() { | |
None => {}, | |
Some(_) => { | |
i += 1; | |
if i == nthreads*nmsgs { break } | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment