Skip to content

Instantly share code, notes, and snippets.

@hito-asa
Created November 29, 2012 11:08
Show Gist options
  • Select an option

  • Save hito-asa/4168252 to your computer and use it in GitHub Desktop.

Select an option

Save hito-asa/4168252 to your computer and use it in GitHub Desktop.
private[worker] case class QueueMonitor(processor: Processor) extends Actor with Logger {
private[this] val BLOCK_TIME = Some(1000L)
def act {
loop {
{
reactWithin(1000) {
case Take =>
// blocking dequeue
processor.queue.dequeue(BLOCK_TIME) map { message =>
log.debug("fetched from " + processor.queue)
processor.actor ! message
}
case Stop =>
log.info("queue monitor [%s] stopped. remaining messages in local queue will be enqueued." format processor.queue)
val messages = processor.queue.dequeueAll(BLOCK_TIME)
processor.extractAll(messages).foreach(processor.queue.enqueue)
sender ! 'normal
exit('normal)
case TIMEOUT =>
}: Unit
} andThen {
processor match {
case FollowProcessor | UnfollowProcessor | DeleteProcessor | BlockProcessor | UnblockProcessor =>
Thread.sleep(200)
case UserDeleteProcessor =>
Thread.sleep(500)
case _ =>
}
if (MongoMonitor.isAlive) {
Actor.self ! Take
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment