Created
September 19, 2016 13:37
-
-
Save jorendorff/3c4c05fa84ebfbb20ebbf71c8ea05e37 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Incomplete channel implementation using a MPSC node-based queue. | |
// | |
// Algorithm: <http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue> | |
// | |
// Differences from `std::sync::mpsc::channel`: | |
// - blocking `recv` is not implemented | |
// - closing the channel by dropping senders is not implemented | |
// - closing the channel by dropping the receiver is not implemented | |
// - nodes are leaked when the receiver is dropped | |
use std::ptr::{self, null_mut}; | |
use std::mem; | |
use std::sync::atomic::AtomicPtr; | |
use std::sync::atomic::Ordering::*; | |
use std::sync::Arc; | |
use std::cell::Cell; | |
struct Node<T> { | |
next: AtomicPtr<Node<T>>, | |
data: T | |
} | |
#[derive(Clone)] | |
struct Sender<T> { | |
head: Arc<AtomicPtr<Node<T>>> | |
} | |
struct Receiver<T> { | |
front: Cell<*mut Node<T>> | |
} | |
unsafe impl<T> Send for Receiver<T> {} | |
fn channel<T>() -> (Sender<T>, Receiver<T>) { | |
let stub = Box::into_raw(Box::new(Node { | |
next: AtomicPtr::new(null_mut()), | |
data: unsafe { mem::uninitialized() } | |
})); | |
( | |
Sender { head: Arc::new(AtomicPtr::new(stub)) }, | |
Receiver { front: Cell::new(stub) } | |
) | |
} | |
impl<T> Sender<T> { | |
fn send(&self, value: T) { | |
let n = Box::into_raw(Box::new(Node { | |
next: AtomicPtr::new(null_mut()), | |
data: value | |
})); | |
let prev = self.head.swap(n, AcqRel); // serialization-point wrt producers | |
unsafe { | |
(*prev).next.store(n, Release); // serialization-point wrt consumer | |
} | |
} | |
} | |
impl<T> Receiver<T> { | |
fn recv(&self) -> Option<T> { | |
unsafe { | |
let front = self.front.get(); | |
let next = (*front).next.load(Acquire); // serialization-point wrt producers | |
if next.is_null() { | |
None | |
} else { | |
self.front.set(next); | |
let uninitialized_data = Box::from_raw(front).data; // free `front`, but | |
mem::forget(uninitialized_data); // don't drop this uninitialized value | |
Some(ptr::read(&(*next).data)) | |
} | |
} | |
} | |
} | |
impl<T: Send> Iterator for Receiver<T> { | |
type Item = T; | |
fn next(&mut self) -> Option<T> { self.recv() } | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::{channel, Sender, Receiver}; | |
use std::thread::spawn; | |
#[test] | |
fn test_fifo_ints() { | |
let (s, r) = channel(); | |
assert_eq!(r.recv(), None); | |
s.send(0); | |
assert_eq!(r.recv(), Some(0)); | |
assert_eq!(r.recv(), None); | |
s.send(17); | |
assert_eq!(r.recv(), Some(17)); | |
assert_eq!(r.recv(), None); | |
const N: u32 = 7; | |
for i in 0..N { | |
s.send(i); | |
} | |
let expected: Vec<u32> = (0..N).collect(); | |
let popped: Vec<u32> = (0..N).map(|_| r.recv().expect("next item in queue")).collect(); | |
assert_eq!(popped, expected); | |
assert_eq!(r.recv(), None); | |
} | |
#[test] | |
fn test_fifo_strings() { | |
let (s, r) = channel(); | |
const N: u32 = 100; | |
let mut log = vec![]; | |
for i in 0..N { | |
let str = format!("string #{}", i); | |
log.push(str.clone()); | |
s.send(str); | |
} | |
assert_eq!(r.collect::<Vec<String>>(), log); | |
} | |
fn send_ints(sender: Sender<i32>) { | |
for i in 0 .. 10 { | |
sender.send(i); | |
} | |
} | |
fn recv_ints(receiver: Receiver<i32>) { | |
let mut log = vec![]; | |
while log.len() < 10 { | |
if let Some(v) = receiver.recv() { | |
log.push(v); | |
} | |
} | |
assert_eq!(log, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]); | |
} | |
#[test] | |
fn test_spsc_1() { | |
// Sender can be moved to another thread. | |
let (s, r) = channel(); | |
let h = spawn(move || send_ints(s)); | |
recv_ints(r); | |
h.join().unwrap(); | |
} | |
#[test] | |
fn test_spsc_2() { | |
// Receiver can be moved to another thread. | |
let (s, r) = channel(); | |
let h = spawn(move || recv_ints(r)); | |
send_ints(s); | |
h.join().unwrap(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment