Last active
February 10, 2016 23:45
-
-
Save PierreMage/7ccfaeb39f31d977e04a to your computer and use it in GitHub Desktop.
Handling an actor's idleness in Akka
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.actor | |
import scala.concurrent.duration._ | |
/** | |
* <p>Thaasophobia is a fear of being idle, sitting. | |
* <p>WARNING: A thaasophobic actor with default behaviour could stop before | |
* concurrent operations complete: | |
* <ul> | |
* <li>Don't use future callbacks inside the actor | |
* <li>This actor should not expect to receive replies/ack messages | |
* </ul> | |
*/ | |
trait Thaasophobia extends Actor { | |
def idleTimeout: Duration | |
override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = | |
msg match { | |
case ReceiveTimeout => | |
handleIdleness() | |
case _ => | |
super.aroundReceive(receive, msg) | |
} | |
def handleIdleness(): Unit = context.stop(self) | |
override def preStart(): Unit = context.setReceiveTimeout(idleTimeout) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.actor | |
import akka.testkit.TestActors.EchoActor | |
import akka.testkit.{ImplicitSender, TestActorRef} | |
import me.crowdmix.traffic.actors.ActorSpec | |
import scala.concurrent.duration._ | |
import scala.concurrent.{ExecutionContext, Future} | |
class ThaasophobiaSpec extends ActorSpec("thaasophobia") with ImplicitSender { | |
"Thaasophobic actor" should { | |
"behave normally" in new Context { | |
thaasophobicActor ! Msg | |
expectMsg(Msg) | |
} | |
"stay alive" in new Context { | |
watch(thaasophobicActor) | |
import ExecutionContext.Implicits.global | |
system.scheduler.schedule(0.millis, _idleTimeout / 2, thaasophobicActor, Ignore) | |
expectNoMsg(1.second) | |
} | |
"stop" in new Context { | |
watch(thaasophobicActor) | |
expectTerminated(thaasophobicActor) | |
} | |
"stop before future completes" in new Context { | |
watch(thaasophobicActor) | |
thaasophobicActor ! Sleep((_idleTimeout * 1.2).toMillis) | |
expectTerminated(thaasophobicActor) | |
} | |
} | |
trait Context { | |
case object Msg | |
case object Ignore | |
case class Sleep(millis: Long) | |
case object Awake | |
val _idleTimeout = 100.millis | |
val thaasophobicActor = | |
TestActorRef(new EchoActor with Thaasophobia { | |
import ExecutionContext.Implicits.global | |
override val idleTimeout = _idleTimeout | |
override val receive: Receive = { | |
case Ignore => | |
case Sleep(millis) => | |
val _sender = sender() | |
Future { | |
Thread.sleep(millis) | |
_sender | |
}.onSuccess { case ref => ref ! Awake } | |
case msg => | |
super.receive(msg) | |
} | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment