The first thing to understand is that the head
field inside of scalaz.concurrent.Actor
is not the "head" of the message queue in any traditional sense of the word. A better description would be "last". The there are no pointers to the head of the queue, which one of the very clever things about this implementation.
Consider the case where the actor has no outstanding messages. This new message will go into the following code:
def !(a: A): Unit = {
val n = new Node(a)
val h = head.getAndSet(n)
if (h ne null) h.lazySet(n)
else schedule(n)
}
As a preliminary, note that Node
extends AtomicReference
and contains a data cell, named a
. head
is itself of type AtomicReference[Node[A]]
, so clearly we have a linked list of AtomicReference
(s).
When no messages are outstanding, the head
will be set to null
, and thus the conditional will fail and immediately call schedule(n)
. This call kicks off the active execution of the actor, likely on a different thread. This execution takes place in the act
method:
@annotation.tailrec
private def act(n: Node[A], i: Int = 1024): Unit = {
try handler(n.a) catch {
case ex: Throwable => onError(ex)
}
val n2 = n.get
if (n2 eq null) scheduleLastTry(n)
else if (i == 0) schedule(n2)
else act(n2, i - 1)
}
The initial value of n
will be exactly what we passed to schedule(n)
, which is to say, the new head
value. We handle this message and then call n.get
, retrieving the next node to handle. Ignore the i
counter for now (this is a thread utilization trick); the important bit is the recursive call to act
in the event that n2
is not null
.
Meanwhile, let's assume that more messages have been coming into the actor. As these messages come in, remember that act
is currently running and processing the queue! We run the getAndSet
in !
on head
, and the result is not null
since we had messages outstanding. Thus, we need to add the message to the tail end of the queue. Notice though that we've already set the new Node
to the value of head
! This is why "head" is a poor descriptor. It really, really is more like "last".
Linking the new last node into the queue is as simple as updating the tail pointer on the old last node, which we do using the lazySet
method on AtomicReference
. This is a particular point of cleverness, because lazySet
is a much faster version of set
, but it relaxes write visibility guarantees! In particular, it does not impose a write-write barrier, and it does not (in general) force the CPU to go all the way to main memory. The downside is that this value may not be visible to other threads any time soon.
Intuitively, this seems wrong for two reasons. First, what if we have multiple concurrent threads attempting to write messages? Second, how does the message become visible on the act
thread? We'll come back to the concurrent writes point, but the visibility question is easily addressed.
Note the call to scheduleLastTry(n)
at the end of act
when the next node is null
. This case represents the state "we think we hit the end of the queue", and it is handled in the following way:
private def scheduleLastTry(n: Node[A]): Unit = strategy(lastTry(n))
private def lastTry(n: Node[A]): Unit = if (!head.compareAndSet(n, null)) act(next(n))
@annotation.tailrec
private def next(n: Node[A]): Node[A] = {
val n2 = n.get
if (n2 ne null) n2
else next(n)
}
All of this is obfuscation around two things: an atomic check on head
to see if there are any extra values that we may have missed, and a call back to act
passing the value of next(n)
. We only call back to act
if head
is not equal to the value of n
. Now, remember that head
really should be called "last", and n
corresponds to the very last value of the queue that we found in our traversal. Thus, we're really just checking to see if our traversal got to the real end, and we're doing that check atomically. If the check passes, then we null out head
and the actor atomically moves back to the "empty actor" state. If the check fails, we go into next(n)
.
This is the clever bit: next
is a busy-wait! Note that all it's doing is repeatedly reading the value of the same AtomicReference
, over and over again, until it is non-null
. There's no recursive traversal here; it's just the same Node
. The reason for this is we know that there are more values in the queue, and the reason we know this is because our atomic check on head
(which should be "last") failed. If the check had passed, then we would know that n
was in fact the very last node in the queue and we would be done. The check failed though, which means that there is a subsequent node which just isn't visible yet on the current thread (remember: lazySet
and not set
, which would guarantee immediate visibility).
Note that the node which is in head
might not be the next node we need to process. All we know about head
is that it contains the very last node in the queue, but there may be several nodes which have come in and not yet become visible on the current thread. It is for this reason that we cannot simply read the value of head
; we have to spin-wait on the value of our current node being non-null
.
The reason for lazySet
is it is several factors faster than set
(usually around 4x-8x). We could use set
, which would mean that next(n)
would always return without iteration. This would of course make next
a lot faster, but the penalty would be making !
dramatically slower! Not only that, but it would make !
slower in the common case in exchange for making next
faster in the uncommon case. Thus, we fast-path optimize by relaxing our write barrier semantics, and the result is a screaming-fast !
function.
This optimization is only made possible because of the atomic check on head
. Because we have an atomic cell with full memory barrier semantics that allows us to efficiently check for the existence of further elements, we are able to enter the spin wait with atomic certainty.
The only remaining case we have no addressed is that of concurrent writers. There are no compareAndSet
calls to update the state of the queue inside of !
, so intuitively, how do we not lose messages under contention? Recall the definition of !
:
def !(a: A): Unit = {
val n = new Node(a)
val h = head.getAndSet(n)
if (h ne null) h.lazySet(n)
else schedule(n)
}
Imagine multiple threads simultaneously stepping through this function. They simultaneously create a new blank Node
, and then they grab the value of head
and call getAndSet
with the new node. This intuitively seems like trouble, since they aren't checking for previous values, updating pointers atomically or anything like that. However, it turns out that this works just fine!
One of the threads in this function will get the "old" value of head
(perhaps null
, or perhaps some other node). The other thread will get the value set by the first thread. In other words, no data is lost, it's just that the node ends up held by another thread under contention!
If the old value of head
was null
, only one of the threads will get that old null
value due to the atomicity of getAndSet
. That thread will make the call to schedule(n)
. Meanwhile, the other thread got the new node set by the first thread and has that value in h
. Remember that head
is really "last". We got the old value of "last" (in h
), and we have atomically stored the new value of "last" in head
. This atomic storage signals to act
that there are more values to read and triggers the spin-wait in next
. The only thing we have left to do is update the pointers on the old "last" to forward next
along to our new "last", and we do this using lazySet
.
Recall that getAndSet
is fully atomic regardless of contention. There may be any number of threads in this function simultaneously, and they will all get different values of h
. Thus, the fact that every thread is unconditionally calling lazySet
is not a problem and does not discard data, since only one thread will ever call lazySet
on a given node! The other thing that this implies is that the queue is being concurrently and non-linearly constructed across multiple threads. As contention increases, the amount of non-linearity also goes up, but since no two threads get the same value for h
(as guaranteed by getAndSet
), ultimate linearization is strongly guaranteed.
As the lazySet
write barriers are eventually flushed to main memory and exposed to other threads, the next
function will find the next element of the queue and allow act
to continue. There may be still more write barriers left to flush, but act
will correctly identify this case in the lastTry
function just as it did before. The actor will not stop until it manages to successfully write null
into head
, which will in turn atomically trigger any contending threads to re-schedule
the actor in the !
function (as applicable).
This is very dense, very convoluted code, but it is also extremely clever and absurdly fast. The overhead here is very very low. While I have some quibbles with the code formatting (e.g. the naming of head
), I do not believe that there are any bugs or suboptimalities in the algorithm as written.
Nice analysis! Any idea, why they check for
null
and don't use a case class or theOption
data type to find out if the nextNode
exists?