Forked from krasserm/SupervisedFileConumerApp.scala
Created
February 17, 2011 07:30
-
-
Save ketankhairnar/831229 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// ########################################################### | |
// | |
// Demonstrates how to supervise an Akka consumer actor. | |
// | |
// The consumer consumes messages from a file endpoint: | |
// - successful message processing by the consumer will | |
// positively acknowledge the message receipt, causing | |
// the file endpoint to delete the file. | |
// - an exception during message processing will cause a | |
// supervisor to restart the consumer. Before restart, | |
// the consumer negatively acknowledges the message | |
// receipt which causes the file endpoint to redeliver | |
// the message. | |
// | |
// This example requires Akka 1.1-SNAPSHOT to run. | |
// | |
// The usage pattern shown here is not limited to file | |
// endpoints but can be used for any other Camel endpoints. | |
// | |
// ########################################################### | |
// ----------------------------------------------------------- | |
// Main: start CamelService, FileConsumer and Supervisor | |
// ----------------------------------------------------------- | |
import akka.actor._ | |
import akka.camel._ | |
import akka.config.Supervision._ | |
CamelServiceManager.startCamelService | |
// Mock repository (throws exception on first use of save method) | |
val repository = new Repository | |
// Supervised file consumer | |
val consumer = Actor.actorOf(new SupervisedFileConsumer(repository)) | |
// Supervisor | |
val supervisor = Supervisor( | |
SupervisorConfig( | |
AllForOneStrategy(List(classOf[Exception]), 5, 10000), | |
Supervise(consumer, Permanent) :: Nil)) | |
} | |
} | |
// ----------------------------------------------------------- | |
// File consumer actor and mock repository | |
// ----------------------------------------------------------- | |
class SupervisedFileConsumer(repo: Repository) extends Actor with Consumer { | |
// let this actor positively or negatively acknowledge the message receipt | |
override def autoack = false | |
// read file from data/input/actor directory and delete | |
// file once processing has been positively acknowledged | |
def endpointUri = "file:data/input/actor?delete=true" | |
def receive = { | |
case msg: Message => { | |
repo.save(msg) // will throw exception of first attempts | |
self.reply(Ack) // positively acknowledge receipt of message | |
// which causes the file endpoint to delete the file | |
} | |
} | |
override def preRestart(reason: scala.Throwable) { | |
reason match { | |
// negatively acknowledge receipt of message | |
// which causes the file endpoint to redeliver it | |
case e: Exception => self.reply(Failure(e)) | |
} | |
} | |
} | |
class Repository { | |
var firstRejected = false | |
// Throws exception on first call | |
def save(content: Any) { | |
if (!firstRejected) { | |
firstRejected = true | |
throw new Exception("save failed") | |
} | |
println("save succeeded") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment