Skip to content

Instantly share code, notes, and snippets.

@toffaletti
Last active December 24, 2015 15:09
Show Gist options
  • Save toffaletti/6817808 to your computer and use it in GitHub Desktop.
Save toffaletti/6817808 to your computer and use it in GitHub Desktop.
use std::unstable::sync::UnsafeArc;
use std::unstable::atomics::{AtomicPtr,Relaxed,Release,Acquire};
use std::ptr::{mut_null, to_mut_unsafe_ptr};
use std::cast;
use std::task;
use std::comm;
use std::fmt;
struct Node<T> {
next: AtomicPtr<Node<T>>,
value: Option<T>,
}
impl<T> Node<T> {
fn new(value: T) -> Node<T> {
Node{next: AtomicPtr::new(mut_null()), value: Some(value)}
}
}
impl<T> Default for Node<T> {
fn default() -> Node<T> {
Node{next: AtomicPtr::new(mut_null()), value: None}
}
}
struct State<T> {
stub: Node<T>,
head: AtomicPtr<Node<T>>,
tail: *mut Node<T>,
}
struct Queue<T> {
priv state: UnsafeArc<State<T>>,
}
impl<T: Send> Clone for Queue<T> {
fn clone(&self) -> Queue<T> {
Queue {
state: self.state.clone()
}
}
}
impl<T: Send> fmt::Default for Queue<T> {
fn fmt(value: &Queue<T>, f: &mut std::fmt::Formatter) {
write!(f.buf, "Queue({})", value.state.get());
}
}
impl<T: Send> Queue<T> {
pub fn new() -> Queue<T> {
let mut q = Queue{state: UnsafeArc::new(State {
stub: Default::default(),
head: AtomicPtr::new(mut_null()),
tail: mut_null(),
})};
let stub = q.get_stub_unsafe();
q.get_head().store(stub, Relaxed);
q.set_tail(stub);
q
}
pub fn push(&mut self, value: T) {
unsafe {
let node = cast::transmute(~Node::new(value));
self.push_node(node);
}
}
fn push_node(&mut self, node: *mut Node<T>) {
unsafe {
(*node).next.store(mut_null(), Release);
let prev = (*self.state.get()).head.swap(node, Relaxed);
(*prev).next.store(node, Release);
}
}
fn get_stub_unsafe(&mut self) -> *mut Node<T> {
unsafe { to_mut_unsafe_ptr(&mut (*self.state.get()).stub) }
}
fn get_head(&mut self) -> &mut AtomicPtr<Node<T>> {
unsafe { &mut (*self.state.get()).head }
}
fn get_tail(&mut self) -> *mut Node<T> {
unsafe { (*self.state.get()).tail }
}
fn set_tail(&mut self, tail: *mut Node<T>) {
unsafe {
(*self.state.get()).tail = tail;
}
}
pub fn pop(&mut self) -> Option<T> {
unsafe {
let mut tail = self.get_tail();
let mut next = (*tail).next.load(Acquire);
let stub = self.get_stub_unsafe();
if tail == stub {
if mut_null() == next {
return None
}
self.set_tail(next);
tail = next;
next = (*next).next.load(Acquire);
}
if next != mut_null() {
let tail: ~Node<T> = cast::transmute(tail);
self.set_tail(next);
return tail.value
}
let head = self.get_head().load(Relaxed);
if tail != head {
return None
}
self.push_node(stub);
next = (*tail).next.load(Acquire);
if next != mut_null() {
let tail: ~Node<T> = cast::transmute(tail);
self.set_tail(next);
return tail.value
}
}
None
}
}
#[test]
fn test() {
let nthreads = 8u;
let nmsgs = 1000u;
let mut q = Queue::new();
assert_eq!(None, q.pop());
for _ in range(0, nthreads) {
let (port, chan) = comm::stream();
chan.send(q.clone());
do task::spawn_sched(task::SingleThreaded) {
let mut q = port.recv();
for i in range(0, nmsgs) {
q.push(i);
}
}
}
let mut i = 0u;
loop {
match q.pop() {
None => {},
Some(_) => {
i += 1;
if i == nthreads*nmsgs { break }
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment