Created
November 29, 2012 11:08
-
-
Save hito-asa/4168252 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| private[worker] case class QueueMonitor(processor: Processor) extends Actor with Logger { | |
| private[this] val BLOCK_TIME = Some(1000L) | |
| def act { | |
| loop { | |
| { | |
| reactWithin(1000) { | |
| case Take => | |
| // blocking dequeue | |
| processor.queue.dequeue(BLOCK_TIME) map { message => | |
| log.debug("fetched from " + processor.queue) | |
| processor.actor ! message | |
| } | |
| case Stop => | |
| log.info("queue monitor [%s] stopped. remaining messages in local queue will be enqueued." format processor.queue) | |
| val messages = processor.queue.dequeueAll(BLOCK_TIME) | |
| processor.extractAll(messages).foreach(processor.queue.enqueue) | |
| sender ! 'normal | |
| exit('normal) | |
| case TIMEOUT => | |
| }: Unit | |
| } andThen { | |
| processor match { | |
| case FollowProcessor | UnfollowProcessor | DeleteProcessor | BlockProcessor | UnblockProcessor => | |
| Thread.sleep(200) | |
| case UserDeleteProcessor => | |
| Thread.sleep(500) | |
| case _ => | |
| } | |
| if (MongoMonitor.isAlive) { | |
| Actor.self ! Take | |
| } | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment