Skip to content

Instantly share code, notes, and snippets.

@ryantanner
Last active December 25, 2015 15:39
Show Gist options
  • Select an option

  • Save ryantanner/7000011 to your computer and use it in GitHub Desktop.

Select an option

Save ryantanner/7000011 to your computer and use it in GitHub Desktop.
Why We Use Actors at Conspire
object UserPipeline {
case class Listen(ref: ActorRef)
case class Start(user: User)
case class Success(user: User)
}
class UserPipeline(
val source: ActorRef // This is an actor that produces a stream of users to be processed
) extends Actor with ActorLogging {
val workers = Map.empty[User, ActorRef]
// Handle potential failures in our child actors
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) {
case _: java.sql.SQLException => Resume
case _: NullPointerException => Restart
case _: Exception => Escalate
}
def receive = {
case UserPipeline.Kickoff =>
source ! Listen(self) // Tell the source actor we'd like to listen to its output
// We've got users to process!
case SourceOutput(users: Seq[User]) =>
users { user =>
// Create a worker actor for each user
val worker = context.actorOf(Props[UserIndexer])
context.watch(worker) // watch the new worker for failure handling
workers += user -> worker // add the worker to our map
worker ! UserPipeline.Start(user) // tell the new worker to start working on this user
}
// PipelineWorker succeeded!
case UserPipeline.Success(user) =>
context.unwatch(sender) // stop watching this actor
context.stop(sender) // Stop the actor
workers -= user //
}
// Terminated messages are generated when an actor dies and sent to any actor watching the dead actor
case Terminated(deadActor) =>
// handle an unexpected failure in the pipeline
workers.find(_._2 == deadActor) foreach { case (user, _) =>
workers -= user
log.error("Worker died processing user {}", user)
}
// Route incoming messages from other services to the right worker
case msg: JobResponse[_] =>
// forward incoming messages to the appropriate worker
workers.get(msg.user) match {
case Some(worker) => worker forward msg
case None => log.error("Received msg for non-existant worker: " + msg)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment