-
-
Save jonhoo/935060885d0d832d463fda3c89e8259d to your computer and use it in GitHub Desktop.
use std::collections::VecDeque; | |
use std::sync::{Arc, Condvar, Mutex}; | |
// Flavors: | |
// - Synchronous channels: Channel where send() can block. Limited capacity. | |
// - Mutex + Condvar + VecDeque | |
// - Atomic VecDeque (atomic queue) + thread::park + thread::Thread::notify | |
// - Asynchronous channels: Channel where send() cannot block. Unbounded. | |
// - Mutex + Condvar + VecDeque | |
// - Mutex + Condvar + LinkedList | |
// - Atomic linked list, linked list of T | |
// - Atomic block linked list, linked list of atomic VecDeque<T> | |
// - Rendezvous channels: Synchronous with capacity = 0. Used for thread synchronization. | |
// - Oneshot channels: Any capacity. In practice, only one call to send(). | |
// async/await | |
pub struct Sender<T> { | |
shared: Arc<Shared<T>>, | |
} | |
impl<T> Clone for Sender<T> { | |
fn clone(&self) -> Self { | |
let mut inner = self.shared.inner.lock().unwrap(); | |
inner.senders += 1; | |
drop(inner); | |
Sender { | |
shared: Arc::clone(&self.shared), | |
} | |
} | |
} | |
impl<T> Drop for Sender<T> { | |
fn drop(&mut self) { | |
let mut inner = self.shared.inner.lock().unwrap(); | |
inner.senders -= 1; | |
let was_last = inner.senders == 0; | |
drop(inner); | |
if was_last { | |
self.shared.available.notify_one(); | |
} | |
} | |
} | |
impl<T> Sender<T> { | |
pub fn send(&mut self, t: T) { | |
let mut inner = self.shared.inner.lock().unwrap(); | |
inner.queue.push_back(t); | |
drop(inner); | |
self.shared.available.notify_one(); | |
} | |
} | |
pub struct Receiver<T> { | |
shared: Arc<Shared<T>>, | |
buffer: VecDeque<T>, | |
} | |
impl<T> Receiver<T> { | |
pub fn recv(&mut self) -> Option<T> { | |
if let Some(t) = self.buffer.pop_front() { | |
return Some(t); | |
} | |
let mut inner = self.shared.inner.lock().unwrap(); | |
loop { | |
match inner.queue.pop_front() { | |
Some(t) => { | |
std::mem::swap(&mut self.buffer, &mut inner.queue); | |
return Some(t); | |
} | |
None if inner.senders == 0 => return None, | |
None => { | |
inner = self.shared.available.wait(inner).unwrap(); | |
} | |
} | |
} | |
} | |
} | |
impl<T> Iterator for Receiver<T> { | |
type Item = T; | |
fn next(&mut self) -> Option<Self::Item> { | |
self.recv() | |
} | |
} | |
struct Inner<T> { | |
queue: VecDeque<T>, | |
senders: usize, | |
} | |
struct Shared<T> { | |
inner: Mutex<Inner<T>>, | |
available: Condvar, | |
} | |
pub fn channel<T>() -> (Sender<T>, Receiver<T>) { | |
let inner = Inner { | |
queue: VecDeque::default(), | |
senders: 1, | |
}; | |
let shared = Shared { | |
inner: Mutex::new(inner), | |
available: Condvar::new(), | |
}; | |
let shared = Arc::new(shared); | |
( | |
Sender { | |
shared: shared.clone(), | |
}, | |
Receiver { | |
shared: shared.clone(), | |
}, | |
) | |
} | |
#[cfg(test)] | |
mod tests { | |
use super::*; | |
#[test] | |
fn ping_pong() { | |
let (mut tx, mut rx) = channel(); | |
tx.send(42); | |
assert_eq!(rx.recv(), Some(42)); | |
} | |
#[test] | |
fn closed_tx() { | |
let (tx, mut rx) = channel::<()>(); | |
drop(tx); | |
assert_eq!(rx.recv(), None); | |
} | |
#[test] | |
fn closed_rx() { | |
let (mut tx, rx) = channel(); | |
drop(rx); | |
tx.send(42); | |
} | |
} |
I actually already talked through that approach in the video :)
oops...
In line 113, why you don't pass buffer to Receiver?
thanks for the videos (:
@jbesraa By "buffer" do you mean shared
? We could totally do that!
Oooooh, I see what you mean! Yes, the code is incomplete. The Receiver
constructor needs:
buffer: Default::default(),
@jonhoo In this implementation of mpsc
, what's stopping us from cloning into multiple consumers? As the Receiver also uses Arc
, we can potentially write ( after commenting out buffer implementation in Receiver
, which is done with an assumption that there would always be a single consumer. Just for simplicity's sake. )
#[test]
fn multiple_rx() { // This test passes!
let (mut tx, rx) = channel();
let mut handles = Vec::new();
tx.send(1);
tx.send(2);
let mut rx1 = Receiver {
shared: Arc::clone(&rx.shared),
};
handles.push(std::thread::spawn(move || {
let recv_val = rx1.recv();
println!("{:#?}", recv_val);
assert_eq!(Some(1), recv_val);
}));
let mut rx2 = Receiver {
shared: Arc::clone(&rx.shared),
};
handles.push(std::thread::spawn(move || {
std::thread::sleep(std::time::Duration::from_millis(100));
let recv_val = rx2.recv();
println!("{:#?}", recv_val);
assert_eq!(Some(2), recv_val);
}));
for handle in handles {
handle.join().unwrap();
}
}
And this test passes! So, there's nothing really preventing us from having multiple consumers. Is that intended or any other explanation?
Thanks for the great content!
Yep, that would work just fine for this implementation!
Thanks for the video! It was invaluable.
I have a small question... In the implementation of send
, why do we need to drop the lock before calling notify_one
? What are the drawbacks of dropping the lock after the notification? E.g:
pub fn send(&mut self, t: T) {
let mut inner = self.shared.inner.lock().unwrap();
inner.queue.push_back(t);
// drop(inner);
self.shared.available.notify_one();
// `inner` is dropped automatically here
}
Thanks in advance!
Ah, if you drop the lock after notifying, the other thread may be woken up, try and then fail to grab the lock because the first thread hasn't released it yet. It'll therefore go to sleep again, and then not get woken up since the one notify
call that would have awoken it had already passed.
Since using
Arc
betweenSender
andReceiver
, why not useArc::strong_count
to check if there'sSender
alive? It's a mpsc channel, ifstring_count == 1
checking byReceiver
, we know there's only oneReceiver
alive and no otherSender
.