Last active
August 29, 2015 14:25
-
-
Save aphexmunky/cc369aa65fd73fb036d9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package actors.service | |
| import akka.actor.{ ActorLogging, ActorRef } | |
| import akka.persistence.PersistentActor | |
| import akka.contrib.pattern._ | |
| import entities.dto.{TsuMovement, Tsu} | |
| import TsuMovementCommands._ | |
| import common.GUIDCreator | |
| import scalaz._ | |
| import actors.helpers.JSONPublisher | |
| import play.api.libs.json._ | |
| import entities.dto.Buffer.TSUMovementEvent | |
| /** | |
| * TsuMovementManager represents an individual TSU's life. | |
| * | |
| * A TSU can be untracked, pending or tracked: | |
| * =========================================== | |
| * | |
| * untracked: the current location isn't known or cared about | |
| * pending: a movement has been planned for the TSU but not yet completed | |
| * tracked: the current location is known and cared about | |
| * | |
| **/ | |
| class TsuMovementManager(locationManager: ActorRef) extends PersistentActor with JSONPublisher with ActorLogging { | |
| import ShardRegion.Passivate | |
| /** | |
| * Path is used to represent unique actors per TSU. This is achieved through | |
| * the clustering actor held by [[TsuMovementManagerService]] | |
| **/ | |
| override def persistenceId = self.path.parent.name + "-" + self.path.name | |
| /** | |
| * Hold the state in the actor. Messages can cause changes to this state. | |
| **/ | |
| implicit var state: TsuState = TsuState() | |
| /** | |
| * When a TSU movement completes or a TSU arrives in a location, it should publish to a buffer. | |
| * These are the configuration values to establish the endpoint | |
| **/ | |
| override val publishProtocol: String = context.system.settings.config.getString("tsu-management.buffer.movement.protocol") | |
| override val publishQueueParams: String = context.system.settings.config.getString("tsu-management.buffer.movement.queueParams") | |
| lazy val queueName: String = context.system.settings.config.getString("tsu-management.buffer.movement.queueName") | |
| /** | |
| * TSUMovementEvents are the type published to buffers | |
| **/ | |
| implicit val tsu: Format[TSUMovementEvent] = Json.format[TSUMovementEvent] | |
| /** | |
| * Begin a TSU's life in an untracked state | |
| **/ | |
| def receiveCommand: Receive = untracked | |
| /** | |
| * These are the possible states - they will all respond to info requests too | |
| **/ | |
| def untracked = infoRequests orElse untrackedHandler | |
| def tracked = infoRequests orElse trackedHandler | |
| def pending = infoRequests orElse pendingHandler | |
| /** | |
| * These requests are purely for retriving state in various ways. They should not | |
| * modify any state or persist anything. | |
| **/ | |
| def infoRequests: Receive = { | |
| case GetTsu(tsuid) => sender ! state.toDTO(tsuid) | |
| } | |
| /** | |
| * A TSU starts in an untracked state. This means we don't have a concept | |
| * of where the TSU is and don't actually care. | |
| * | |
| * An untracked TSU only has the ability to create pending movements and | |
| * say it has arrived somewhere. | |
| * | |
| * Creating a pending movement puts the TSU in a pending state. | |
| * Creating an arrived movement puts the TSU in a tracked state. | |
| **/ | |
| def untrackedHandler: Receive = { | |
| case CreateMovement(tsuid, toLocation) => { | |
| persist(TsuMovementCreated(toLocation = toLocation)) { evt => | |
| log.info("TSU '{}' is going from an untracked state to pending location: {}", tsuid, toLocation) | |
| state = state.allocate(toLocation) | |
| sender ! state.toDTO(tsuid) | |
| context.become(pending) | |
| } | |
| } | |
| case CreateCompleteMovement(tsuid, toLocation) => { | |
| persist(TsuArrivedMovement(toLocation = toLocation)) { evt => | |
| log.info("TSU '{}' is now tracked and has arrived at location: {}", tsuid, toLocation) | |
| updateLocationManager(Some(toLocation)) | |
| // TODO: this needs the from address? | |
| state = state.arrive(toLocation) | |
| sender ! state.toDTO(tsuid) | |
| locationManager ! TsuLocationCommands.SetTsuLocation(tsuid, toLocation) | |
| context.become(tracked) | |
| publish(TSUMovementEvent(toLocation), queueName) | |
| } | |
| } | |
| case invalid: Cmd => { | |
| log.warning("An untracked TSU {} received a command it couldn't handle: {}", state, invalid) | |
| sender ! state.toDTO(self.path.name) | |
| } | |
| } | |
| /** | |
| * A pending TSU movement means there's an intention for a TSU to get somewhere. | |
| * | |
| * It can be updated through additional create movements, otherwise it can: | |
| * Complete the movement and sets the TSU to a tracked state | |
| * or | |
| * Cancel the movement and sets the TSU to its previous state | |
| **/ | |
| def pendingHandler: Receive = { | |
| case CreateMovement(tsuid, toLocation) => { | |
| persist(TsuMovementCreated(toLocation = toLocation)) { evt => | |
| log.info("TSU '{}' is pending and was expected at location: {} but is now expected at location: {}", tsuid, state.nextLocation.get, toLocation) | |
| val status = state.nextLocation match { | |
| case next if next == toLocation => Some("CREATED") | |
| case _ => Some("UPDATED") | |
| } | |
| state = state.allocate(toLocation) | |
| sender ! state.toDTO(tsuid) | |
| } | |
| } | |
| case CompleteMovement(tsuid, toLocation) if state.nextLocation == Some(toLocation) => { | |
| persist(TsuMovementCompleted()) { evt => | |
| log.info("TSU '{}' is now tracked and successfully arrived at location: {}", tsuid, state.nextLocation.get) | |
| updateLocationManager(Some(toLocation)) | |
| state = state.completeAllocatedMovement | |
| sender ! state.toDTO(tsuid) | |
| context.become(tracked) | |
| publish(TSUMovementEvent(toLocation), queueName) | |
| } | |
| } | |
| case cancel: CancelMovement if Some(cancel.toLocation) == state.nextLocation => { | |
| persist(TsuMovementCancelled()) { evt => | |
| log.info("TSU '{}' had its pending movement cancelled", cancel.tsuid) | |
| state = state.cancelMovement | |
| sender ! state.toDTO(cancel.tsuid) | |
| state match { | |
| case TsuState(Some(location), _) => context.become(tracked) | |
| case TsuState(None, _) => context.become(untracked) | |
| } | |
| } | |
| } | |
| case invalid: Cmd => { | |
| log.warning("A pending TSU {} received a command it couldn't handle: {}", state, invalid) | |
| sender ! state.toDTO(self.path.name) | |
| } | |
| } | |
| /** | |
| * A TSU is tracked when we know and care about the location for it. | |
| * | |
| * A tracked TSU can have movements created for it. | |
| * A tracked TSU can say it has arrived somewhere | |
| * A tracked TSU can become untracked | |
| **/ | |
| def trackedHandler: Receive = { | |
| case CreateMovement(tsuid, toLocation) => { | |
| val allocationReference = GUIDCreator.newGUID | |
| persist(TsuMovementCreated(toLocation = toLocation)) { evt => | |
| log.info("TSU '{}' is going from a tracked state to pending location: {}", tsuid, toLocation) | |
| state = state.allocate(toLocation) | |
| sender ! state.toDTO(tsuid) | |
| context.become(pending) | |
| } | |
| } | |
| case CreateCompleteMovement(tsuid, toLocation) => { | |
| persist(TsuArrivedMovement(toLocation = toLocation)) { evt => | |
| log.info("TSU '{}' is now tracked and has arrived at location: {}", tsuid, toLocation) | |
| updateLocationManager(Some(toLocation)) | |
| state = state.arrive(toLocation) | |
| sender ! state.toDTO(tsuid) | |
| publish(TSUMovementEvent(toLocation), queueName) | |
| } | |
| } | |
| case SetUntrackedMovement(tsuid) => { | |
| persist(TsuUntrackedMovement()) { evt => | |
| log.info("TSU '{}' is being untracked from location: {}", tsuid, state.currentLocation.get) | |
| updateLocationManager() | |
| state = state.untrack | |
| sender ! state.toDTO(tsuid) | |
| context.become(untracked) | |
| } | |
| } | |
| case invalid: Cmd => { | |
| log.warning("A tracked TSU {} received a command it couldn't handle: {}", state, invalid) | |
| sender ! state.toDTO(self.path.name) | |
| } | |
| } | |
| /** | |
| * Recovery of the state of the actor | |
| **/ | |
| def receiveRecover: Receive = { | |
| case mc: TsuMovementCreated => { | |
| log.info("RECOVERING #{}: {}", lastSequenceNr, mc) | |
| state = state.allocate(mc.toLocation) | |
| context.become(pending) | |
| } | |
| case mc: TsuMovementCompleted => { | |
| log.info("RECOVERING #{}: {}", lastSequenceNr, mc) | |
| state = state.completeAllocatedMovement | |
| context.become(tracked) | |
| } | |
| case mc: TsuMovementCancelled => { | |
| log.info("RECOVERING #{}: {}", lastSequenceNr, mc) | |
| state = state.cancelMovement | |
| state match { | |
| case TsuState(Some(location), _) => context.become(tracked) | |
| case TsuState(None, _) => context.become(untracked) | |
| } | |
| } | |
| case mc: TsuArrivedMovement => { | |
| log.info("RECOVERING #{}: {}", lastSequenceNr, mc) | |
| state = state.arrive(mc.toLocation) | |
| context.become(tracked) | |
| } | |
| case mc: TsuUntrackedMovement => { | |
| log.info("RECOVERING #{}: {}", lastSequenceNr, mc) | |
| state = state.untrack | |
| context.become(untracked) | |
| } | |
| } | |
| /** | |
| * Because we're key'ing TSU by tsuId, it becomes difficult to view TSU | |
| * by other groupings such as by location. So for every final location | |
| * change we call to update another persistent actor that maps by | |
| * location. | |
| * | |
| **/ | |
| def updateLocationManager(newLocation: Option[String] = None): Unit = { | |
| state.currentLocation.map(locationManager ! TsuLocationCommands.UnsetTsuLocation(self.path.name, _)) | |
| newLocation.map(locationManager ! TsuLocationCommands.SetTsuLocation(self.path.name, _)) | |
| } | |
| /** | |
| * Definition of the state the TSU can hold and operations to change that state | |
| **/ | |
| case class TsuState(currentLocation: Option[String] = None, nextLocation: Option[String] = None) { | |
| def allocate(toLocation: String): TsuState = this.copy(nextLocation = Some(toLocation)) | |
| def cancelMovement: TsuState = this.copy(nextLocation = None) | |
| def completeAllocatedMovement: TsuState = TsuState(currentLocation = this.nextLocation) | |
| def arrive(location: String) = TsuState(currentLocation = Some(location)) | |
| def untrack: TsuState = TsuState() | |
| def toDTO(tsuId: String): Tsu = Tsu(tsuId = tsuId, currentLocation = this.currentLocation, nextLocation = this.nextLocation) | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment