Lengthy explanation why patch works.
Sender part is simple: it (either pushes a message or sets "closed" flag) and then calls notify if any task is registered.
The interesting part is function
Function starts with optimistic pop:
return match self.queue_pop_spin() {
Async::Ready(msg) => Async::Ready(Some(msg)),
Async::NotReady => { ... }
}
This part is an optimization. If it returns Ready
, caller will call this function again. It is safe.
So from correctness point of view, pop_queue_and_open
can be reduced to:
fn poll_queue_and_open(&self) -> Async<Option<T>> {
if self.is_open() {
self.receiver_park();
}
let open = self.is_open();
match self.queue_pop_spin() {
Async::NotReady => {
if open {
Async::NotReady
} else {
Async::Ready(None)
}
},
Async::Ready(msg) => Async::Ready(Some(msg)),
}
}
Function does park at the beginning:
if self.is_open() {
self.receiver_park();
}
If chennel is open, receiver always parks, even if queue is empty.
If channel is closed, then there's no need to park, because receiver simply drains the queue.
self.is_open
condition is an optimization to avoid parking when closed.
Remaining part of the function is:
let open = self.is_open();
match self.queue_pop_spin() {
Async::NotReady => {
if open {
Async::NotReady
} else {
Async::Ready(None)
}
},
Async::Ready(msg) => Async::Ready(Some(msg)),
}
Let's start with looking at non-blocking CloseableQueue
abstraction.
struct CloseableQueue<T> {
queue: Queue<T>,
closed: AtomicBool,
}
We do not want to have it in the library, because closed
flag is shared with number of senders field. But for the explanation let's look at that abstraction.
impl<T> CloseableQueue<T> {
fn close(&self) { self.closed.store(true); }
fn push(&self, item: T) { self.queue.push(item); }
// `Async::Ready(None)` means EOF
fn pop(&self) -> Async<Option<T>> {
// It is essential to load `closed` flag before inner `pop`
// otherwise there's a chance to return `Async::Ready(None)` with non-empty queue.
// And we do not poll and do not check any state after EOF.
let closed = self.closed.load();
match self.pop() {
Async::Ready(item) => Async::Ready(item),
Async::NotReady => {
if closed {
Async::Ready(None)
} else {
Async::NotReady
}
}
}
}
}
So, the last fragment of the pop_queue_and_open
function:
let open = self.is_open();
match self.queue_pop_spin() {
Async::NotReady => {
if open {
Async::NotReady
} else {
Async::Ready(None)
}
},
Async::Ready(msg) => Async::Ready(Some(msg)),
}
is essentially:
self.closeable_queue.pop()
So our function can be reduced to:
fn poll_queue_and_open(&self) -> Async<Option<T>> {
if self.is_open() {
self.receiver_park();
}
self.closeable_queue.pop()
}
Why is it correct? Function can return three states: NotReady
, Ready(None)
and Ready(Some(item))
.
If function returns Ready(None)
then it is correct, because correctness of closeable_queue
is explained above.
If function returns Ready(Some(item))
then it is correct, because it will be called again.
If function return NotReady
, it is tricky. Receiver parks first, and checks for queue state after. But sender updates queue state first, and attempts to unpark after.
So at the end of function after returning NotReady
, either of two states possible:
- queue is empty and not closed
- (queue is not empty or closed) AND sender either already called
notify
or going to do that soon, so receiver will be polled again
There's a race in implementation.
If receiver explicitly closes the channel concurrently with sender.send
, send
may receive success,
but the message won't be received by the receiver.
XXX