Last active
August 29, 2015 13:56
-
-
Save ericacm/9338232 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicInteger | |
import concurrent.{Future, Promise} | |
import akka.actor._ | |
import concurrent.duration._ | |
import util.control.NoStackTrace | |
object RemoteActorResolver { | |
val resolverCount = new AtomicInteger(0) | |
type LookupMap = Map[ActorPath, Promise[ActorRef]] | |
class ActorResolutionException(message: String) extends Exception(message) with NoStackTrace | |
def props(lookup: LookupMap, timeout: Duration, enableNoMatch: Boolean): Props = | |
Props(classOf[RemoteActorResolver], lookup, timeout, enableNoMatch) | |
/** | |
* Start resolving ActorPaths to ActorRefs | |
* | |
* @param factory `ActorContext` or `ActorSystem` | |
* @param paths Collection of `ActorPaths` to resolve | |
* @param timeout Timeout after which promises will be failed | |
* @param enableNoMatch If `true` then receiving `ActorIdentity(path, None)` will cause resolution | |
* for the path to fail. Otherwise it is ignored and only timeouts cause lookup | |
* failures. | |
* @return Map of `ActorPath` to `Promise[ActorRef]` where the `ActorPaths` are | |
* the `paths` and the promises will be completed when the paths are | |
* resolved (or failed if the timeout is reached). | |
*/ | |
def startResolution(factory: ActorRefFactory, paths: Seq[ActorPath], timeout: Duration = 5.minutes, | |
enableNoMatch: Boolean = false): LookupMap = { | |
val lookup = paths.map(_ -> Promise[ActorRef]()).toMap | |
factory.actorOf(props(lookup, timeout, enableNoMatch), "remoteActorResolver" + resolverCount.incrementAndGet()) | |
lookup | |
} | |
/** | |
* Resolve an ActorPath asynchronously | |
* | |
* @param lookup Map returned from `StartResolution` | |
* @param path One of the ActorPaths in the `lookup` map | |
* @return A `Future[ActorRef]` that will be completed when the `ActorPath` is resolved. | |
*/ | |
def resolvePathAsync(lookup: LookupMap, path: ActorPath): Future[ActorRef] = lookup(path).future | |
} | |
/** | |
* Using `Identify`/`ActorIdentity`, try to resolve all of the `ActorPath`s in lookup map. | |
* As each `ActorPath` is successfully or unsuccessfully resolved its corresponding `Promise` | |
* is completed or failed, respectively. | |
* | |
* Stops self after the last path is resolved. | |
* | |
* Note: Ignores the case where `ActorIdentity(_, None)` is received. This is because the actor might | |
* not be created on the target system yet. TODO: maybe handle it optionally. | |
* | |
* @param lookup Prepopulated map of `ActorPath` to `Promise[ActorRef]` | |
* @param timeout After timeout all promises corresponding to unresolved paths will be failed and | |
* this actor will stop itself. | |
* @param enableNoMatch If `true` then receiving `ActorIdentity(path, None)` will cause resolution | |
* for the path to fail. Otherwise it is ignored and only timeouts cause lookup | |
* failures. | |
*/ | |
class RemoteActorResolver(lookup: LookupMap, timeout: Duration, enableNoMatch: Boolean) extends Actor with ActorLogging { | |
val cycleTimeout = 5.seconds | |
var elapsed = Duration.Zero | |
context setReceiveTimeout cycleTimeout | |
sendIdentify() | |
def sendIdentify() = lookup foreach { case (path, promise) => | |
if (!promise.isCompleted) context.system.actorSelection(path) ! Identify(path) | |
} | |
def stopIfFinished(): Unit = if (lookup.values.forall(_.isCompleted)) context stop self | |
def exception(message: String) = new ActorResolutionException(message) | |
def receive = { | |
case ActorIdentity(path: ActorPath, Some(ref)) => | |
lookup(path) trySuccess ref | |
stopIfFinished() | |
case ActorIdentity(path: ActorPath, None) if enableNoMatch => | |
lookup(path) tryFailure exception(s"No actor exists at $path") | |
stopIfFinished() | |
case ReceiveTimeout => | |
elapsed += cycleTimeout | |
if (elapsed >= timeout) { | |
lookup filterNot(_._2.isCompleted) foreach { case (path, promise) => | |
promise tryFailure exception(s"Timeout waiting for $path to be resolved") | |
} | |
context stop self | |
} else { | |
sendIdentify() | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment