Created
August 24, 2010 13:59
-
-
Save samhendley/547596 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Forgive me if this issue has been addressed before but I wasn’t able to unearth anything describing this issue on the web or on this list. | |
I believe there is a bug in the MailBox implementation. If a receiveWithin block timesout the next message is lost. It appears that the lost message is sent to the previous receiveWithin block, though code inside that block isn’t run either. I have a test case that proves this behavior (http://gist.github.com/547596). Is there something I am missing and this is intentional? If so the documentation needs to be updated to warn about this behavior. | |
In either case I am looking for a replacement to mailbox. I only stumbled onto this behavior because there was no way to peek into a MailBox and see if there is a message waiting. I was simulating that behavior by doing the shortest possible recieveWithin to check for already existing messages and then doing a longer wait once I had consumed all of the available messages and still hadn’t received the message I wanted. | |
Thanks for your time. | |
Sam |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.psi | |
import org.scalatest.FunSuite | |
import org.scalatest.junit.JUnitRunner | |
import org.junit.runner.RunWith | |
import scala.concurrent.{ MailBox, TIMEOUT } | |
/// This test shows that a receiveWithin that timesout causes the next message to be | |
/// lost and I believe it also leaks the closure but I haven't been able to confirm | |
/// that. | |
/// http://lampsvn.epfl.ch/trac/scala/browser/scala/tags/R_2_8_0_final/src/library/scala/concurrent/MailBox.scala | |
/// The source code seems to indicate the problem is the receiveWithin closure is not | |
/// removed from the list of applicable recievers when the timeout expires. | |
@RunWith(classOf[JUnitRunner]) | |
class MailBoxTests extends FunSuite { | |
case class Test(i: Int) | |
/// Checks that the next message in the mail box matches what we expect | |
def checkRecieved(mail: MailBox, value: Int): Boolean = { | |
mail.receiveWithin(1){ | |
case Test(x) => return (x == value) | |
case TIMEOUT => return false | |
} | |
return false | |
} | |
/// this block can only receive the timeout (not the message type!) this works | |
/// as expected since it doesn't register as consuming the message of interest | |
def checkTimeoutCantRecieve(mail: MailBox): Boolean = { | |
mail.receiveWithin(1){ | |
case TIMEOUT => return true | |
} | |
return false | |
} | |
/// this block can receive either a timeout or the message type (which is the | |
/// way we would expect to see it used in real code). I have added a println | |
/// which should show if this closure is still "alive" | |
def checkTimeoutCanReceive(mail: MailBox): Boolean = { | |
mail.receiveWithin(1){ | |
case Test(x) => { | |
println("unexpected test value: " + x) | |
return false | |
} | |
case TIMEOUT => return true | |
} | |
return false | |
} | |
test("Works with no timeouts"){ | |
val mail = new MailBox | |
mail.send(new Test(1)) | |
mail.send(new Test(2)) | |
assert(checkRecieved(mail, 1)) | |
assert(checkRecieved(mail, 2)) | |
mail.send(new Test(3)) | |
assert(checkRecieved(mail, 3)) | |
} | |
test("Passes with timeout that cant recieve"){ | |
val mail = new MailBox | |
mail.send(new Test(1)) | |
assert(checkRecieved(mail, 1)) | |
assert(checkTimeoutCantRecieve(mail)) | |
mail.send(new Test(2)) | |
assert(checkRecieved(mail, 2)) | |
} | |
test("Fails because timeout eats message"){ | |
val mail = new MailBox | |
assert(checkTimeoutCanReceive(mail)) | |
// fails because the timeout test eats the message, should see message on stdout | |
mail.send(new Test(2)) | |
assert(checkRecieved(mail, 2)) | |
} | |
test("Passes with sacrificial message"){ | |
val mail = new MailBox | |
assert(checkTimeoutCanReceive(mail)) | |
// sending an extra message that is sent to the timeout closure fixes the issue | |
mail.send(new Test(99)) | |
mail.send(new Test(2)) | |
assert(checkRecieved(mail, 2)) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package org.psi | |
import scala.concurrent.MailBox | |
/** | |
* A class that allow us to test multi-threaded code without reproducing the same | |
* mailbox + recieveWithin machinery | |
*/ | |
class SyncVar[T <: Any](intialValue: T) { | |
case class StateChange(value: T) | |
var current: T = intialValue | |
val mail = new MailBox | |
def update(value: T) { | |
mail.send(StateChange(value)) | |
} | |
def waitUntil(value: T, msec: Long = 5000): Boolean = { | |
waitFor(current => current == value, msec) | |
} | |
def waitWhile(value: T, msec: Long = 5000): Boolean = { | |
waitFor(current => current != value, msec) | |
} | |
def waitFor(fun: T => Boolean, msec: Long = 5000): Boolean = { | |
val end = System.currentTimeMillis + msec | |
while (true) { | |
var tilExpiry = end - System.currentTimeMillis | |
if (tilExpiry <= 0) return false | |
mail.receiveWithin(tilExpiry) { | |
case StateChange(x) => | |
current = x | |
if (fun(current)) return true | |
} | |
} | |
return false | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment