Skip to content

Instantly share code, notes, and snippets.

@odd
Last active May 25, 2019 15:27
Show Gist options
  • Save odd/e356e178bfa178f727e3 to your computer and use it in GitHub Desktop.
Save odd/e356e178bfa178f727e3 to your computer and use it in GitHub Desktop.
LocalExecutor allows executing callback code locally inside an actor (where the actor state can be safely modified)
import akka.actor._
import scala.concurrent.duration.Duration
import scala.concurrent.{CanAwait, ExecutionContext, Future, TimeoutException}
import scala.util.{Failure, Success, Try}
object LocalExecutor {
private case class Execute(runnable: Try[Runnable])
private class LocalExecutionContext(target: ActorRef) extends ExecutionContext {
override def execute(r: Runnable) = target ! Execute(Try(r))
override def reportFailure(t: Throwable) = target ! Execute(Failure(t))
}
}
trait LocalExecutor { actor: Actor =>
import LocalExecutor._
private val ec = new LocalExecutionContext(actor.self)
implicit class LocalFuture[T](f: Future[T]) {
def local: Future[T] = new Future[T] {
override def onComplete[U](t: (Try[T]) => U)(implicit executor: ExecutionContext) = f.onComplete(t)(ec)
override def isCompleted = f.isCompleted
override def value = f.value
@throws[Exception](classOf[Exception])
override def result(atMost: Duration)(implicit permit: CanAwait) = f.result(atMost)
@throws[InterruptedException](classOf[InterruptedException])
@throws[TimeoutException](classOf[TimeoutException])
override def ready(atMost: Duration)(implicit permit: CanAwait) = {
f.ready(atMost)
this
}
}
}
def receiveExecute: Receive = {
case Execute(Success(r)) => r.run()
case Execute(Failure(t)) => throw t
}
}
class SampleActor extends Actor with LocalExecutor {
import context.dispatcher
var state: Int = 0
override def receive: Receive = receiveExecute orElse {
case n: Int =>
val f: Future[Int] = Future(n * 10) // asynchronous calculation involving n
f.foreach { m =>
// executed from outside this actor, modifying local state is NOT safe
println("outside: " + m)
}
f.local.foreach { m =>
// executed from within this actor, modifying local state is safe
state += m
println("inside: " + state)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment