Skip to content

Instantly share code, notes, and snippets.

@aphexmunky
Last active August 29, 2015 14:25
Show Gist options
  • Select an option

  • Save aphexmunky/cc369aa65fd73fb036d9 to your computer and use it in GitHub Desktop.

Select an option

Save aphexmunky/cc369aa65fd73fb036d9 to your computer and use it in GitHub Desktop.
package actors.service
import akka.actor.{ ActorLogging, ActorRef }
import akka.persistence.PersistentActor
import akka.contrib.pattern._
import entities.dto.{TsuMovement, Tsu}
import TsuMovementCommands._
import common.GUIDCreator
import scalaz._
import actors.helpers.JSONPublisher
import play.api.libs.json._
import entities.dto.Buffer.TSUMovementEvent
/**
* TsuMovementManager represents an individual TSU's life.
*
* A TSU can be untracked, pending or tracked:
* ===========================================
*
* untracked: the current location isn't known or cared about
* pending: a movement has been planned for the TSU but not yet completed
* tracked: the current location is known and cared about
*
**/
class TsuMovementManager(locationManager: ActorRef) extends PersistentActor with JSONPublisher with ActorLogging {
import ShardRegion.Passivate
/**
* Path is used to represent unique actors per TSU. This is achieved through
* the clustering actor held by [[TsuMovementManagerService]]
**/
override def persistenceId = self.path.parent.name + "-" + self.path.name
/**
* Hold the state in the actor. Messages can cause changes to this state.
**/
implicit var state: TsuState = TsuState()
/**
* When a TSU movement completes or a TSU arrives in a location, it should publish to a buffer.
* These are the configuration values to establish the endpoint
**/
override val publishProtocol: String = context.system.settings.config.getString("tsu-management.buffer.movement.protocol")
override val publishQueueParams: String = context.system.settings.config.getString("tsu-management.buffer.movement.queueParams")
lazy val queueName: String = context.system.settings.config.getString("tsu-management.buffer.movement.queueName")
/**
* TSUMovementEvents are the type published to buffers
**/
implicit val tsu: Format[TSUMovementEvent] = Json.format[TSUMovementEvent]
/**
* Begin a TSU's life in an untracked state
**/
def receiveCommand: Receive = untracked
/**
* These are the possible states - they will all respond to info requests too
**/
def untracked = infoRequests orElse untrackedHandler
def tracked = infoRequests orElse trackedHandler
def pending = infoRequests orElse pendingHandler
/**
* These requests are purely for retriving state in various ways. They should not
* modify any state or persist anything.
**/
def infoRequests: Receive = {
case GetTsu(tsuid) => sender ! state.toDTO(tsuid)
}
/**
* A TSU starts in an untracked state. This means we don't have a concept
* of where the TSU is and don't actually care.
*
* An untracked TSU only has the ability to create pending movements and
* say it has arrived somewhere.
*
* Creating a pending movement puts the TSU in a pending state.
* Creating an arrived movement puts the TSU in a tracked state.
**/
def untrackedHandler: Receive = {
case CreateMovement(tsuid, toLocation) => {
persist(TsuMovementCreated(toLocation = toLocation)) { evt =>
log.info("TSU '{}' is going from an untracked state to pending location: {}", tsuid, toLocation)
state = state.allocate(toLocation)
sender ! state.toDTO(tsuid)
context.become(pending)
}
}
case CreateCompleteMovement(tsuid, toLocation) => {
persist(TsuArrivedMovement(toLocation = toLocation)) { evt =>
log.info("TSU '{}' is now tracked and has arrived at location: {}", tsuid, toLocation)
updateLocationManager(Some(toLocation))
// TODO: this needs the from address?
state = state.arrive(toLocation)
sender ! state.toDTO(tsuid)
locationManager ! TsuLocationCommands.SetTsuLocation(tsuid, toLocation)
context.become(tracked)
publish(TSUMovementEvent(toLocation), queueName)
}
}
case invalid: Cmd => {
log.warning("An untracked TSU {} received a command it couldn't handle: {}", state, invalid)
sender ! state.toDTO(self.path.name)
}
}
/**
* A pending TSU movement means there's an intention for a TSU to get somewhere.
*
* It can be updated through additional create movements, otherwise it can:
* Complete the movement and sets the TSU to a tracked state
* or
* Cancel the movement and sets the TSU to its previous state
**/
def pendingHandler: Receive = {
case CreateMovement(tsuid, toLocation) => {
persist(TsuMovementCreated(toLocation = toLocation)) { evt =>
log.info("TSU '{}' is pending and was expected at location: {} but is now expected at location: {}", tsuid, state.nextLocation.get, toLocation)
val status = state.nextLocation match {
case next if next == toLocation => Some("CREATED")
case _ => Some("UPDATED")
}
state = state.allocate(toLocation)
sender ! state.toDTO(tsuid)
}
}
case CompleteMovement(tsuid, toLocation) if state.nextLocation == Some(toLocation) => {
persist(TsuMovementCompleted()) { evt =>
log.info("TSU '{}' is now tracked and successfully arrived at location: {}", tsuid, state.nextLocation.get)
updateLocationManager(Some(toLocation))
state = state.completeAllocatedMovement
sender ! state.toDTO(tsuid)
context.become(tracked)
publish(TSUMovementEvent(toLocation), queueName)
}
}
case cancel: CancelMovement if Some(cancel.toLocation) == state.nextLocation => {
persist(TsuMovementCancelled()) { evt =>
log.info("TSU '{}' had its pending movement cancelled", cancel.tsuid)
state = state.cancelMovement
sender ! state.toDTO(cancel.tsuid)
state match {
case TsuState(Some(location), _) => context.become(tracked)
case TsuState(None, _) => context.become(untracked)
}
}
}
case invalid: Cmd => {
log.warning("A pending TSU {} received a command it couldn't handle: {}", state, invalid)
sender ! state.toDTO(self.path.name)
}
}
/**
* A TSU is tracked when we know and care about the location for it.
*
* A tracked TSU can have movements created for it.
* A tracked TSU can say it has arrived somewhere
* A tracked TSU can become untracked
**/
def trackedHandler: Receive = {
case CreateMovement(tsuid, toLocation) => {
val allocationReference = GUIDCreator.newGUID
persist(TsuMovementCreated(toLocation = toLocation)) { evt =>
log.info("TSU '{}' is going from a tracked state to pending location: {}", tsuid, toLocation)
state = state.allocate(toLocation)
sender ! state.toDTO(tsuid)
context.become(pending)
}
}
case CreateCompleteMovement(tsuid, toLocation) => {
persist(TsuArrivedMovement(toLocation = toLocation)) { evt =>
log.info("TSU '{}' is now tracked and has arrived at location: {}", tsuid, toLocation)
updateLocationManager(Some(toLocation))
state = state.arrive(toLocation)
sender ! state.toDTO(tsuid)
publish(TSUMovementEvent(toLocation), queueName)
}
}
case SetUntrackedMovement(tsuid) => {
persist(TsuUntrackedMovement()) { evt =>
log.info("TSU '{}' is being untracked from location: {}", tsuid, state.currentLocation.get)
updateLocationManager()
state = state.untrack
sender ! state.toDTO(tsuid)
context.become(untracked)
}
}
case invalid: Cmd => {
log.warning("A tracked TSU {} received a command it couldn't handle: {}", state, invalid)
sender ! state.toDTO(self.path.name)
}
}
/**
* Recovery of the state of the actor
**/
def receiveRecover: Receive = {
case mc: TsuMovementCreated => {
log.info("RECOVERING #{}: {}", lastSequenceNr, mc)
state = state.allocate(mc.toLocation)
context.become(pending)
}
case mc: TsuMovementCompleted => {
log.info("RECOVERING #{}: {}", lastSequenceNr, mc)
state = state.completeAllocatedMovement
context.become(tracked)
}
case mc: TsuMovementCancelled => {
log.info("RECOVERING #{}: {}", lastSequenceNr, mc)
state = state.cancelMovement
state match {
case TsuState(Some(location), _) => context.become(tracked)
case TsuState(None, _) => context.become(untracked)
}
}
case mc: TsuArrivedMovement => {
log.info("RECOVERING #{}: {}", lastSequenceNr, mc)
state = state.arrive(mc.toLocation)
context.become(tracked)
}
case mc: TsuUntrackedMovement => {
log.info("RECOVERING #{}: {}", lastSequenceNr, mc)
state = state.untrack
context.become(untracked)
}
}
/**
* Because we're key'ing TSU by tsuId, it becomes difficult to view TSU
* by other groupings such as by location. So for every final location
* change we call to update another persistent actor that maps by
* location.
*
**/
def updateLocationManager(newLocation: Option[String] = None): Unit = {
state.currentLocation.map(locationManager ! TsuLocationCommands.UnsetTsuLocation(self.path.name, _))
newLocation.map(locationManager ! TsuLocationCommands.SetTsuLocation(self.path.name, _))
}
/**
* Definition of the state the TSU can hold and operations to change that state
**/
case class TsuState(currentLocation: Option[String] = None, nextLocation: Option[String] = None) {
def allocate(toLocation: String): TsuState = this.copy(nextLocation = Some(toLocation))
def cancelMovement: TsuState = this.copy(nextLocation = None)
def completeAllocatedMovement: TsuState = TsuState(currentLocation = this.nextLocation)
def arrive(location: String) = TsuState(currentLocation = Some(location))
def untrack: TsuState = TsuState()
def toDTO(tsuId: String): Tsu = Tsu(tsuId = tsuId, currentLocation = this.currentLocation, nextLocation = this.nextLocation)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment