Skip to content

Instantly share code, notes, and snippets.

@lregnier
Last active July 13, 2017 21:22
Show Gist options
  • Save lregnier/920ba815b38852eaf9e085501cb09ad2 to your computer and use it in GitHub Desktop.
Save lregnier/920ba815b38852eaf9e085501cb09ad2 to your computer and use it in GitHub Desktop.
import akka.actor.{Actor, ActorIdentity, ActorLogging, ActorRef, FSM, Identify, Props, Stash, Terminated}
import scala.concurrent.duration.{Duration, _}
object RemoteActorProxy {
sealed trait State
case object Identifying extends State
case object Active extends State
sealed trait Data
case object UnidentifiedRemoteActor extends Data
case class IdentifiedRemoteActor(remoteActor: ActorRef) extends Data
def props(path: String, identifyFrequency: FiniteDuration = 5 seconds): Props = {
Props(new RemoteActorProxy(path, identifyFrequency))
}
}
class RemoteActorProxy(path: String, identifyFrequency: FiniteDuration) extends Actor with FSM[RemoteActorProxy.State, RemoteActorProxy.Data] with Stash with ActorLogging {
import RemoteActorProxy._
case object IdentifyRemoteActor // Internal Message
startWith(Identifying, UnidentifiedRemoteActor)
when(Identifying) {
case Event(IdentifyRemoteActor, _) => {
log.info("Sending Identify msg for path {}", path)
val selection = context.actorSelection(path)
selection ! Identify(path)
stay
}
case Event(ActorIdentity(`path`, Some(remoteActor)), _) => {
log.info("Remote Actor with path {} has been identified at following reference: {}.", path, remoteActor)
context.setReceiveTimeout(Duration.Undefined)
context.watch(remoteActor)
goto(Active) using IdentifiedRemoteActor(remoteActor)
}
case Event(ActorIdentity(`path`, None), _) => {
log.error("Remote Actor with path {} is not available.", path)
stay
}
case Event(msg, _) => {
log.info("Message received while actor not yet Active: {}.", msg)
stash() // Stash the msg for later
stay
}
}
when(Active) {
case Event(Terminated(remoteActor), _) => {
log.info("Remote actor terminated: {}.", remoteActor)
context.unwatch(remoteActor)
goto(Identifying) using UnidentifiedRemoteActor
}
case Event(msg, IdentifiedRemoteActor(remoteActor)) => {
remoteActor forward msg
stay
}
}
whenUnhandled {
case Event(e, s) => {
log.warning("Received unhandled msg {} in state {}/{}", e, stateName, s)
stay
}
}
onTransition {
case _ -> Identifying => {
setTimer("identify", IdentifyRemoteActor, identifyFrequency, repeat = true)
self ! IdentifyRemoteActor
}
case _ -> Active => {
cancelTimer("identify")
unstashAll() // Unstash all received messages
}
}
initialize()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment