Last active
July 13, 2017 21:22
-
-
Save lregnier/920ba815b38852eaf9e085501cb09ad2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Actor, ActorIdentity, ActorLogging, ActorRef, FSM, Identify, Props, Stash, Terminated} | |
import scala.concurrent.duration.{Duration, _} | |
object RemoteActorProxy { | |
sealed trait State | |
case object Identifying extends State | |
case object Active extends State | |
sealed trait Data | |
case object UnidentifiedRemoteActor extends Data | |
case class IdentifiedRemoteActor(remoteActor: ActorRef) extends Data | |
def props(path: String, identifyFrequency: FiniteDuration = 5 seconds): Props = { | |
Props(new RemoteActorProxy(path, identifyFrequency)) | |
} | |
} | |
class RemoteActorProxy(path: String, identifyFrequency: FiniteDuration) extends Actor with FSM[RemoteActorProxy.State, RemoteActorProxy.Data] with Stash with ActorLogging { | |
import RemoteActorProxy._ | |
case object IdentifyRemoteActor // Internal Message | |
startWith(Identifying, UnidentifiedRemoteActor) | |
when(Identifying) { | |
case Event(IdentifyRemoteActor, _) => { | |
log.info("Sending Identify msg for path {}", path) | |
val selection = context.actorSelection(path) | |
selection ! Identify(path) | |
stay | |
} | |
case Event(ActorIdentity(`path`, Some(remoteActor)), _) => { | |
log.info("Remote Actor with path {} has been identified at following reference: {}.", path, remoteActor) | |
context.setReceiveTimeout(Duration.Undefined) | |
context.watch(remoteActor) | |
goto(Active) using IdentifiedRemoteActor(remoteActor) | |
} | |
case Event(ActorIdentity(`path`, None), _) => { | |
log.error("Remote Actor with path {} is not available.", path) | |
stay | |
} | |
case Event(msg, _) => { | |
log.info("Message received while actor not yet Active: {}.", msg) | |
stash() // Stash the msg for later | |
stay | |
} | |
} | |
when(Active) { | |
case Event(Terminated(remoteActor), _) => { | |
log.info("Remote actor terminated: {}.", remoteActor) | |
context.unwatch(remoteActor) | |
goto(Identifying) using UnidentifiedRemoteActor | |
} | |
case Event(msg, IdentifiedRemoteActor(remoteActor)) => { | |
remoteActor forward msg | |
stay | |
} | |
} | |
whenUnhandled { | |
case Event(e, s) => { | |
log.warning("Received unhandled msg {} in state {}/{}", e, stateName, s) | |
stay | |
} | |
} | |
onTransition { | |
case _ -> Identifying => { | |
setTimer("identify", IdentifyRemoteActor, identifyFrequency, repeat = true) | |
self ! IdentifyRemoteActor | |
} | |
case _ -> Active => { | |
cancelTimer("identify") | |
unstashAll() // Unstash all received messages | |
} | |
} | |
initialize() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment