Last active
May 25, 2019 15:27
-
-
Save odd/e356e178bfa178f727e3 to your computer and use it in GitHub Desktop.
LocalExecutor allows executing callback code locally inside an actor (where the actor state can be safely modified)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import scala.concurrent.duration.Duration | |
import scala.concurrent.{CanAwait, ExecutionContext, Future, TimeoutException} | |
import scala.util.{Failure, Success, Try} | |
object LocalExecutor { | |
private case class Execute(runnable: Try[Runnable]) | |
private class LocalExecutionContext(target: ActorRef) extends ExecutionContext { | |
override def execute(r: Runnable) = target ! Execute(Try(r)) | |
override def reportFailure(t: Throwable) = target ! Execute(Failure(t)) | |
} | |
} | |
trait LocalExecutor { actor: Actor => | |
import LocalExecutor._ | |
private val ec = new LocalExecutionContext(actor.self) | |
implicit class LocalFuture[T](f: Future[T]) { | |
def local: Future[T] = new Future[T] { | |
override def onComplete[U](t: (Try[T]) => U)(implicit executor: ExecutionContext) = f.onComplete(t)(ec) | |
override def isCompleted = f.isCompleted | |
override def value = f.value | |
@throws[Exception](classOf[Exception]) | |
override def result(atMost: Duration)(implicit permit: CanAwait) = f.result(atMost) | |
@throws[InterruptedException](classOf[InterruptedException]) | |
@throws[TimeoutException](classOf[TimeoutException]) | |
override def ready(atMost: Duration)(implicit permit: CanAwait) = { | |
f.ready(atMost) | |
this | |
} | |
} | |
} | |
def receiveExecute: Receive = { | |
case Execute(Success(r)) => r.run() | |
case Execute(Failure(t)) => throw t | |
} | |
} | |
class SampleActor extends Actor with LocalExecutor { | |
import context.dispatcher | |
var state: Int = 0 | |
override def receive: Receive = receiveExecute orElse { | |
case n: Int => | |
val f: Future[Int] = Future(n * 10) // asynchronous calculation involving n | |
f.foreach { m => | |
// executed from outside this actor, modifying local state is NOT safe | |
println("outside: " + m) | |
} | |
f.local.foreach { m => | |
// executed from within this actor, modifying local state is safe | |
state += m | |
println("inside: " + state) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment