Skip to content

Instantly share code, notes, and snippets.

@jorendorff
Created September 19, 2016 13:37
Show Gist options
  • Save jorendorff/3c4c05fa84ebfbb20ebbf71c8ea05e37 to your computer and use it in GitHub Desktop.
Save jorendorff/3c4c05fa84ebfbb20ebbf71c8ea05e37 to your computer and use it in GitHub Desktop.
// Incomplete channel implementation using a MPSC node-based queue.
//
// Algorithm: <http://www.1024cores.net/home/lock-free-algorithms/queues/non-intrusive-mpsc-node-based-queue>
//
// Differences from `std::sync::mpsc::channel`:
// - blocking `recv` is not implemented
// - closing the channel by dropping senders is not implemented
// - closing the channel by dropping the receiver is not implemented
// - nodes are leaked when the receiver is dropped
use std::ptr::{self, null_mut};
use std::mem;
use std::sync::atomic::AtomicPtr;
use std::sync::atomic::Ordering::*;
use std::sync::Arc;
use std::cell::Cell;
struct Node<T> {
next: AtomicPtr<Node<T>>,
data: T
}
#[derive(Clone)]
struct Sender<T> {
head: Arc<AtomicPtr<Node<T>>>
}
struct Receiver<T> {
front: Cell<*mut Node<T>>
}
unsafe impl<T> Send for Receiver<T> {}
fn channel<T>() -> (Sender<T>, Receiver<T>) {
let stub = Box::into_raw(Box::new(Node {
next: AtomicPtr::new(null_mut()),
data: unsafe { mem::uninitialized() }
}));
(
Sender { head: Arc::new(AtomicPtr::new(stub)) },
Receiver { front: Cell::new(stub) }
)
}
impl<T> Sender<T> {
fn send(&self, value: T) {
let n = Box::into_raw(Box::new(Node {
next: AtomicPtr::new(null_mut()),
data: value
}));
let prev = self.head.swap(n, AcqRel); // serialization-point wrt producers
unsafe {
(*prev).next.store(n, Release); // serialization-point wrt consumer
}
}
}
impl<T> Receiver<T> {
fn recv(&self) -> Option<T> {
unsafe {
let front = self.front.get();
let next = (*front).next.load(Acquire); // serialization-point wrt producers
if next.is_null() {
None
} else {
self.front.set(next);
let uninitialized_data = Box::from_raw(front).data; // free `front`, but
mem::forget(uninitialized_data); // don't drop this uninitialized value
Some(ptr::read(&(*next).data))
}
}
}
}
impl<T: Send> Iterator for Receiver<T> {
type Item = T;
fn next(&mut self) -> Option<T> { self.recv() }
}
#[cfg(test)]
mod tests {
use super::{channel, Sender, Receiver};
use std::thread::spawn;
#[test]
fn test_fifo_ints() {
let (s, r) = channel();
assert_eq!(r.recv(), None);
s.send(0);
assert_eq!(r.recv(), Some(0));
assert_eq!(r.recv(), None);
s.send(17);
assert_eq!(r.recv(), Some(17));
assert_eq!(r.recv(), None);
const N: u32 = 7;
for i in 0..N {
s.send(i);
}
let expected: Vec<u32> = (0..N).collect();
let popped: Vec<u32> = (0..N).map(|_| r.recv().expect("next item in queue")).collect();
assert_eq!(popped, expected);
assert_eq!(r.recv(), None);
}
#[test]
fn test_fifo_strings() {
let (s, r) = channel();
const N: u32 = 100;
let mut log = vec![];
for i in 0..N {
let str = format!("string #{}", i);
log.push(str.clone());
s.send(str);
}
assert_eq!(r.collect::<Vec<String>>(), log);
}
fn send_ints(sender: Sender<i32>) {
for i in 0 .. 10 {
sender.send(i);
}
}
fn recv_ints(receiver: Receiver<i32>) {
let mut log = vec![];
while log.len() < 10 {
if let Some(v) = receiver.recv() {
log.push(v);
}
}
assert_eq!(log, [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
}
#[test]
fn test_spsc_1() {
// Sender can be moved to another thread.
let (s, r) = channel();
let h = spawn(move || send_ints(s));
recv_ints(r);
h.join().unwrap();
}
#[test]
fn test_spsc_2() {
// Receiver can be moved to another thread.
let (s, r) = channel();
let h = spawn(move || recv_ints(r));
send_ints(s);
h.join().unwrap();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment