Created
September 3, 2013 18:12
-
-
Save pchiusano/6427498 to your computer and use it in GitHub Desktop.
Actor memory consistency
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package woot | |
import scalaz.concurrent._ | |
/* | |
Test to see if `@volatile` is required for mutable fields closed over by an `Actor` | |
using `Strategy.Sequential`. As a test, we have an actor that closes over a integer | |
counter, which it increments for each message it receives. If memory effects are | |
visible to subsequent messages, the count will be equal to the number of messages | |
sent to the actor. | |
Output (showing thread name, current count): | |
T2 0 | |
T1 1 | |
T2 2 | |
T1 3 | |
T2 4 | |
T1 5 | |
T2 6 | |
T1 7 | |
T2 8 | |
run-main 9 | |
So at least for this example, it looks like even with thread switching, effects of | |
processing previous messages are always visible when processing subsequent messages. | |
This is just absence of disproof, though, not a proof. | |
Runar's explanation: | |
"However, whenever you enter the actor you are creating a new stack frame which, | |
if I understand correctly, will read the current value of the closed-over variable | |
from the heap." | |
So the argument is that each time the `handler: A => Unit` function of an `Actor[A]` | |
gets called, that is a fresh stack frame, so any mutable variables it closes over | |
must be read from the heap. We would only have a problem if we switched between | |
threads midway through running one call to `handler` - if we did this, the second | |
thread might not see all the effects of the first. But the actor model explicitly | |
guarantees that messages are processed sequentially, so it is never the case that | |
two threads are running the handler at the same time. | |
So reads of mutable fields closed over must fetch the current value from the heap, | |
but are we guaranteed that all writes to a mutable field will be flushed to the | |
heap before the next read occurs? | |
*/ | |
object ActorTesting extends App { | |
/* | |
* Sending `()` to this `Actor` increments an internal counter. | |
* We expect that the count will be equal to the number of | |
* messages sent to the actor -- since regardless of what thread | |
* each message is processed on, we expect that each message will | |
* use the latest count set when processing the previous message. | |
*/ | |
def counter: Actor[Unit] = { | |
var i = 0 | |
Actor.actor[Unit] { _ => | |
println(s"${Thread.currentThread.getName} $i") | |
i = i + 1 | |
} (Strategy.Sequential) | |
} | |
val a = counter | |
// increment the count concurrently from two threads | |
// the sleeps are to ensure there is actually some thread | |
// switching | |
val t1 = new Thread("T1") { override def run = { | |
Thread.sleep(1000) | |
a ! () | |
Thread.sleep(2000) | |
a ! () | |
Thread.sleep(1000) | |
a ! () | |
Thread.sleep(2000) | |
a ! () | |
}} | |
val t2 = new Thread("T2") { override def run = { | |
a ! () | |
Thread.sleep(1000) | |
a ! () | |
Thread.sleep(2000) | |
a ! () | |
Thread.sleep(1000) | |
a ! () | |
Thread.sleep(2000) | |
a ! () | |
}} | |
t1.start | |
t2.start | |
t1.join; t2.join | |
a ! () | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment