Last active
December 25, 2015 15:39
-
-
Save ryantanner/7000011 to your computer and use it in GitHub Desktop.
Why We Use Actors at Conspire
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| object UserPipeline { | |
| case class Listen(ref: ActorRef) | |
| case class Start(user: User) | |
| case class Success(user: User) | |
| } | |
| class UserPipeline( | |
| val source: ActorRef // This is an actor that produces a stream of users to be processed | |
| ) extends Actor with ActorLogging { | |
| val workers = Map.empty[User, ActorRef] | |
| // Handle potential failures in our child actors | |
| override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) { | |
| case _: java.sql.SQLException => Resume | |
| case _: NullPointerException => Restart | |
| case _: Exception => Escalate | |
| } | |
| def receive = { | |
| case UserPipeline.Kickoff => | |
| source ! Listen(self) // Tell the source actor we'd like to listen to its output | |
| // We've got users to process! | |
| case SourceOutput(users: Seq[User]) => | |
| users { user => | |
| // Create a worker actor for each user | |
| val worker = context.actorOf(Props[UserIndexer]) | |
| context.watch(worker) // watch the new worker for failure handling | |
| workers += user -> worker // add the worker to our map | |
| worker ! UserPipeline.Start(user) // tell the new worker to start working on this user | |
| } | |
| // PipelineWorker succeeded! | |
| case UserPipeline.Success(user) => | |
| context.unwatch(sender) // stop watching this actor | |
| context.stop(sender) // Stop the actor | |
| workers -= user // | |
| } | |
| // Terminated messages are generated when an actor dies and sent to any actor watching the dead actor | |
| case Terminated(deadActor) => | |
| // handle an unexpected failure in the pipeline | |
| workers.find(_._2 == deadActor) foreach { case (user, _) => | |
| workers -= user | |
| log.error("Worker died processing user {}", user) | |
| } | |
| // Route incoming messages from other services to the right worker | |
| case msg: JobResponse[_] => | |
| // forward incoming messages to the appropriate worker | |
| workers.get(msg.user) match { | |
| case Some(worker) => worker forward msg | |
| case None => log.error("Received msg for non-existant worker: " + msg) | |
| } | |
| } | |
| } | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment