-
-
Save ryantanner/7015577 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** Reads queue of new users to index */ | |
class NewUserQueue extends Actor with ActorLogging { ... } | |
// Define our messages for the NewUserQueue | |
object NewUserQueue { | |
case class Listen(ref: ActorRef) | |
case class StopListen(ref: ActorRef) | |
case class NewUsers(users: Seq[User]) | |
} | |
/** Indexes documents for a new user */ | |
class UserIndexer extends Actor with ActorLogging { ... } | |
// Define our messages for the UserIndexer | |
object UserIndexer { | |
case class Start(user: User) | |
case class Success(user: User) | |
case class Failure(user: User) | |
} | |
/** Updates indexed timestamp and writes user to persistent storage */ | |
class UserPersister extends Actor with ActorLogging { ... } | |
// Define our messages for the UserPersister | |
object UserPersister { | |
case class UpdateTimestampAndWrite(user: User) | |
case class Success(user: User) | |
case class Failure(user: User) | |
} | |
/** Coordinates indexing of all new users */ | |
class UserPipeline( | |
val queue: ActorRef // This is a NewUserQueue actor that produces a stream of users to be processed | |
) extends Actor with ActorLogging { | |
// Create a persister actor to write users to DB | |
val persister = context.actorOf(Props[UserPersister]) | |
// Map of lightweight, single-use indexer actors | |
val indexers = Map.empty[User, ActorRef] | |
// Handle potential failures in our child actors | |
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5, withinTimeRange = 1 minute) { | |
case _: java.sql.SQLException => Resume | |
case _: NullPointerException => Restart | |
case _: Exception => Escalate | |
} | |
override def preStart = { | |
// Tell the queue to send new users our way | |
queue ! NewUserQueue.Listen(self) | |
} | |
override def postStop = { | |
// Tell the queue to stop sending users to us | |
queue ! NewUserQueue.StopListen(self) | |
} | |
def receive = { | |
// We've got users to process! | |
case NewUserQueue.NewUsers(users: Seq[User]) => | |
users.foreach { user => | |
val indexer = context.actorOf(Props[UserIndexer]) // Create a worker actor for each user | |
context.watch(indexer) // Watch the new worker for failure handling | |
indexers += user -> indexer // Add the worker to our map | |
indexer ! UserIndexer.Start(user) // Tell the new worker to start working on this user | |
} | |
// UserIndexer succeeded. Update timestamp and write to DB. | |
case UserIndexer.Success(user) => | |
context.unwatch(sender) // Stop watching this actor | |
context.stop(sender) // Stop the actor | |
indexers -= user // Clear from map | |
persister ! UserPersister.UpdateTimestampAndWrite(user) // Tell the database to update the user | |
// Persister succeeded. We're done! | |
case UserPersister.Success(user) => | |
log.info("Done!") | |
// UserIndexer failed | |
case UserIndexer.Failure(user) => | |
context.unwatch(sender) // stop watching this actor | |
context.stop(sender) // Stop the actor | |
indexers -= user | |
log.info("Indexer failed") | |
// Persister failed | |
case UserPersister.Failure(user) => | |
log.info("Persist failed") | |
// Terminated messages are generated when an actor dies and sent to any actor watching the dead actor | |
case Terminated(deadActor) => | |
// handle an unexpected failure in the pipeline | |
indexers.find(_._2 == deadActor) foreach { case (user, _) => | |
indexers -= user | |
log.error("Worker died processing user {}", user) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment