Created April 23, 2012 14:02
Use actor state from Future callbacks
package com.ometer.akka
import akka.dispatch.ExecutionContext
import akka.dispatch.Promise
import akka.dispatch.Future
import akka.dispatch.DefaultPromise
class Example extends SafeActor {
case object Whatever
var myMutableState = 10
override def safeReceive = {
case Whatever =>
// the basic idea is that it's OK to access myMutableState in the Future
// callbacks, normally it would not be. Future callbacks are kept in the
// context of this actor rather than executing on independent threads.
// Needless to say, this is a tradeoff: if your callbacks do some kind of
// blocking or long computation maybe you want an independent thread.
val f = Future({ myMutableState += 1; println("This callback should be serialized on the actor") }) => println("This callback should also be serialized on the actor"))
// futures created outside of this actor would be "imported" using safe()
// to avoid using whatever random ExecutionContext is set on them
// val f = safe(someService.doStuff())
* An implicit ExecutionContext in this actor dispatches
* any Future created in this actor serialized on the
* actor, so callbacks can touch the actor's state.
* "Foreign" futures should be adapted with safe()
* to run their callbacks in this actor.
* It is probably risky to "export" futures that
* execute in this actor to another actor.
* To subclass, override safeReceive instead of receive.
* Clunky but not sure what else to do.
* Naming this class "safe" is likely unsafe. ;-)
abstract class SafeActor extends Actor {
private[akka] lazy val _safeContext = new SafeActorExecutionContext(this)
* Futures created directly inside the SafeActor would use this, but
* futures from elsewhere need converting with safe()
protected implicit def safeContext: ExecutionContext = _safeContext
private[akka] def reportFailure(t: Throwable) = context.dispatcher.reportFailure(t)
* Convert a future to run its callbacks in this actor.
protected final def safe[T](f: Future[T]): Future[T] = {
f match {
case p: DefaultPromise[_] if p.executor eq _safeContext =>
// optimize the case where a future is already executing
// in this actor
case _ =>
private def internalReceive: Receive = {
case Code(runnable) =>
/** receive first intercepts SafeActor messages then calls safeReceive */
override final def receive: Receive = internalReceive orElse safeReceive
/** replacement for receive for subclasses to override */
protected def safeReceive: Receive
private case class Code(runnable: Runnable)
private class SafeActorExecutionContext(val safeActor: SafeActor) extends ExecutionContext {
override def execute(runnable: Runnable): Unit = {
safeActor.self ! Code(runnable)
override def reportFailure(t: Throwable): Unit = {
