Skip to content

Instantly share code, notes, and snippets.

@stepancheg
Last active June 9, 2017 06:37
Show Gist options
  • Save stepancheg/1562d10ea0052b85938d70466f082153 to your computer and use it in GitHub Desktop.
Save stepancheg/1562d10ea0052b85938d70466f082153 to your computer and use it in GitHub Desktop.
Explanation why patch works

Lengthy explanation why patch works.

Sender part is simple: it (either pushes a message or sets "closed" flag) and then calls notify if any task is registered.

The interesting part is function

poll_queue_and_open

Function starts with optimistic pop:

return match self.queue_pop_spin() {
    Async::Ready(msg) => Async::Ready(Some(msg)),
    Async::NotReady => { ... }
}

This part is an optimization. If it returns Ready, caller will call this function again. It is safe.

So from correctness point of view, pop_queue_and_open can be reduced to:

    fn poll_queue_and_open(&self) -> Async<Option<T>> {
        if self.is_open() {
            self.receiver_park();
        }

        let open = self.is_open();

        match self.queue_pop_spin() {
            Async::NotReady => {
                if open {
                    Async::NotReady
                } else {
                    Async::Ready(None)
                }
            },
            Async::Ready(msg) => Async::Ready(Some(msg)),
        }
    }

Park

Function does park at the beginning:

if self.is_open() {
    self.receiver_park();
}

If chennel is open, receiver always parks, even if queue is empty.

If channel is closed, then there's no need to park, because receiver simply drains the queue. self.is_open condition is an optimization to avoid parking when closed.

Remaining part of the function is:

        let open = self.is_open();

        match self.queue_pop_spin() {
            Async::NotReady => {
                if open {
                    Async::NotReady
                } else {
                    Async::Ready(None)
                }
            },
            Async::Ready(msg) => Async::Ready(Some(msg)),
        }

Let's start with looking at non-blocking CloseableQueue abstraction.

CloseableQueue

struct CloseableQueue<T> {
    queue: Queue<T>,
    closed: AtomicBool,
}

We do not want to have it in the library, because closed flag is shared with number of senders field. But for the explanation let's look at that abstraction.

impl<T> CloseableQueue<T> {
    fn close(&self) { self.closed.store(true); }
    fn push(&self, item: T) { self.queue.push(item); }

    // `Async::Ready(None)` means EOF
    fn pop(&self) -> Async<Option<T>> {
        // It is essential to load `closed` flag before inner `pop`
        // otherwise there's a chance to return `Async::Ready(None)` with non-empty queue.
        // And we do not poll and do not check any state after EOF.
        let closed = self.closed.load();
        match self.pop() {
            Async::Ready(item) => Async::Ready(item),
            Async::NotReady => {
                if closed {
                    Async::Ready(None)
                } else {
                    Async::NotReady
                }
            }
        }
    }
}

So, the last fragment of the pop_queue_and_open function:

                let open = self.is_open();

                match self.queue_pop_spin() {
                    Async::NotReady => {
                        if open {
                            Async::NotReady
                        } else {
                            Async::Ready(None)
                        }
                    },
                    Async::Ready(msg) => Async::Ready(Some(msg)),
                }

is essentially:

self.closeable_queue.pop()

So our function can be reduced to:

    fn poll_queue_and_open(&self) -> Async<Option<T>> {
        if self.is_open() {
            self.receiver_park();
        }

        self.closeable_queue.pop()
    }

Why is it correct? Function can return three states: NotReady, Ready(None) and Ready(Some(item)).

If function returns Ready(None) then it is correct, because correctness of closeable_queue is explained above.

If function returns Ready(Some(item)) then it is correct, because it will be called again.

If function return NotReady, it is tricky. Receiver parks first, and checks for queue state after. But sender updates queue state first, and attempts to unpark after.

So at the end of function after returning NotReady, either of two states possible:

  • queue is empty and not closed
  • (queue is not empty or closed) AND sender either already called notify or going to do that soon, so receiver will be polled again

A race

There's a race in implementation.

If receiver explicitly closes the channel concurrently with sender.send, send may receive success, but the message won't be received by the receiver.

XXX

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment