Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Created September 3, 2013 18:12
Show Gist options
  • Save pchiusano/6427498 to your computer and use it in GitHub Desktop.
Save pchiusano/6427498 to your computer and use it in GitHub Desktop.
Actor memory consistency
package woot
import scalaz.concurrent._
/*
Test to see if `@volatile` is required for mutable fields closed over by an `Actor`
using `Strategy.Sequential`. As a test, we have an actor that closes over a integer
counter, which it increments for each message it receives. If memory effects are
visible to subsequent messages, the count will be equal to the number of messages
sent to the actor.
Output (showing thread name, current count):
T2 0
T1 1
T2 2
T1 3
T2 4
T1 5
T2 6
T1 7
T2 8
run-main 9
So at least for this example, it looks like even with thread switching, effects of
processing previous messages are always visible when processing subsequent messages.
This is just absence of disproof, though, not a proof.
Runar's explanation:
"However, whenever you enter the actor you are creating a new stack frame which,
if I understand correctly, will read the current value of the closed-over variable
from the heap."
So the argument is that each time the `handler: A => Unit` function of an `Actor[A]`
gets called, that is a fresh stack frame, so any mutable variables it closes over
must be read from the heap. We would only have a problem if we switched between
threads midway through running one call to `handler` - if we did this, the second
thread might not see all the effects of the first. But the actor model explicitly
guarantees that messages are processed sequentially, so it is never the case that
two threads are running the handler at the same time.
So reads of mutable fields closed over must fetch the current value from the heap,
but are we guaranteed that all writes to a mutable field will be flushed to the
heap before the next read occurs?
*/
object ActorTesting extends App {
/*
* Sending `()` to this `Actor` increments an internal counter.
* We expect that the count will be equal to the number of
* messages sent to the actor -- since regardless of what thread
* each message is processed on, we expect that each message will
* use the latest count set when processing the previous message.
*/
def counter: Actor[Unit] = {
var i = 0
Actor.actor[Unit] { _ =>
println(s"${Thread.currentThread.getName} $i")
i = i + 1
} (Strategy.Sequential)
}
val a = counter
// increment the count concurrently from two threads
// the sleeps are to ensure there is actually some thread
// switching
val t1 = new Thread("T1") { override def run = {
Thread.sleep(1000)
a ! ()
Thread.sleep(2000)
a ! ()
Thread.sleep(1000)
a ! ()
Thread.sleep(2000)
a ! ()
}}
val t2 = new Thread("T2") { override def run = {
a ! ()
Thread.sleep(1000)
a ! ()
Thread.sleep(2000)
a ! ()
Thread.sleep(1000)
a ! ()
Thread.sleep(2000)
a ! ()
}}
t1.start
t2.start
t1.join; t2.join
a ! ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment