Last active
March 16, 2017 03:38
-
-
Save jasonqu/392b1aedddf34290e4af94fb16b746c9 to your computer and use it in GitHub Desktop.
supervisor strategy from [Akka Essentials](https://github.com/write2munish/Akka-Essentials)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.book.essence.chap6.AllForOne | |
import akka.actor.SupervisorStrategy._ | |
import akka.actor._ | |
import akka.pattern.ask | |
import akka.util.Timeout | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
case object Result | |
class WorkerActor extends Actor with ActorLogging { | |
var state: Int = 0 | |
override def preStart() { | |
log.info("Starting WorkerActor instance hashcode # {}", this.hashCode()) | |
} | |
override def postStop() { | |
log.info("Stopping WorkerActor instance hashcode # {}", this.hashCode()) | |
} | |
def receive: Receive = { | |
case value: Int => | |
if (value <= 0) | |
throw new ArithmeticException("Number equal or less than zero") | |
else | |
state = value | |
case Result => | |
sender ! state | |
case ex: NullPointerException => | |
throw new NullPointerException("Null Value Passed") | |
case _ => | |
throw new IllegalArgumentException("Wrong Argument") | |
} | |
} | |
class SupervisorActor extends Actor with ActorLogging { | |
val workerActor1 = context.actorOf(Props[WorkerActor], | |
name = "workerActor1") | |
val workerActor2 = context.actorOf(Props[WorkerActor], | |
name = "workerActor2") | |
override val supervisorStrategy = AllForOneStrategy( | |
maxNrOfRetries = 10, withinTimeRange = 10 seconds) { | |
case e: ArithmeticException => | |
// todo 不打印就没有错误消息 | |
//log.error(e, "before resume") | |
Resume | |
case _: NullPointerException => Restart | |
case _: IllegalArgumentException => Stop | |
case _: Exception => Escalate | |
} | |
def receive = { | |
case Result => | |
workerActor1.tell(Result, sender) | |
case msg => | |
workerActor1 ! msg | |
} | |
} | |
object HelloAkkaScala extends App { | |
val system = ActorSystem("faultTolerance") | |
val log = system.log | |
val originalValue: Int = 0 | |
val supervisor = system.actorOf(Props[SupervisorActor], name = "supervisor") | |
log.info("Sending value 8, no exceptions should be thrown! ") | |
var mesg: Int = 8 | |
supervisor ! mesg | |
implicit val timeout = Timeout(1 seconds) | |
var future = (supervisor ? Result).mapTo[Int] | |
var result = Await.result(future, timeout.duration) | |
log.info("Value Received -> {}", result) | |
log.info("Sending value -8, ArithmeticException should be thrown! Our Supervisor strategy says resume!") | |
mesg = -8 | |
supervisor ! mesg | |
future = (supervisor ? Result).mapTo[Int] | |
result = Await.result(future, timeout.duration) | |
log.info("Value Received-> {}", result) | |
log.info("Sending value null, NullPointerException should be thrown! Our Supervisor strategy says restart !") | |
supervisor ! new NullPointerException | |
future = (supervisor ? Result).mapTo[Int] | |
result = Await.result(future, timeout.duration) | |
log.info("Value Received-> {}", result) | |
log.info("Sending value \"String\", IllegalArgumentException should be thrown! Our Supervisor strategy says Stop !") | |
supervisor ? "Do Something" | |
result = Await.result(future, timeout.duration) | |
println(s"result : $result") | |
log.info("no Value should be Received-> {}", result) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.book.essence.chap6.OneForOne | |
import akka.actor.SupervisorStrategy._ | |
import akka.actor._ | |
import akka.pattern.ask | |
import akka.util.Timeout | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
case object Result | |
class WorkerActor extends Actor with ActorLogging { | |
var state: Int = 0 | |
override def preStart() { | |
log.info("Starting WorkerActor instance hashcode # {}", this.hashCode()) | |
} | |
override def postStop() { | |
log.info("Stopping WorkerActor instance hashcode # {}", this.hashCode()) | |
} | |
def receive: Receive = { | |
case value: Int => | |
if (value <= 0) | |
throw new ArithmeticException("Number equal or less than zero") | |
else | |
state = value | |
case Result => | |
sender ! state | |
case ex: NullPointerException => | |
throw new NullPointerException("Null Value Passed") | |
case _ => | |
throw new IllegalArgumentException("Wrong Argument") | |
} | |
} | |
class SupervisorActor extends Actor with ActorLogging { | |
val childActor = context.actorOf(Props[WorkerActor], name = "workerActor") | |
override val supervisorStrategy = OneForOneStrategy( | |
maxNrOfRetries = 10, withinTimeRange = 10 seconds) { | |
case _: ArithmeticException => Resume | |
case _: NullPointerException => Restart | |
case _: IllegalArgumentException => Stop | |
case _: Exception => Escalate | |
} | |
def receive = { | |
case Result => | |
childActor.tell(Result, sender) | |
case msg => | |
childActor ! msg | |
} | |
} | |
object HelloAkkaScala extends App { | |
val system = ActorSystem("faultTolerance") | |
val log = system.log | |
val originalValue: Int = 0 | |
val supervisor = system.actorOf(Props[SupervisorActor], name = "supervisor") | |
log.info("Sending value 8, no exceptions should be thrown! ") | |
var mesg: Int = 8 | |
supervisor ! mesg | |
implicit val timeout = Timeout(5 seconds) | |
var future = (supervisor ? Result).mapTo[Int] | |
var result = Await.result(future, timeout.duration) | |
log.info("Value Received -> {}", result) | |
log.info("Sending value -8, ArithmeticException should be thrown! Our Supervisor strategy says resume!") | |
mesg = -8 | |
supervisor ! mesg | |
future = (supervisor ? Result).mapTo[Int] | |
result = Await.result(future, timeout.duration) | |
log.info("Value Received-> {}", result) | |
log.info("Sending value null, NullPointerException should be thrown! Our Supervisor strategy says restart !") | |
supervisor ! new NullPointerException | |
future = (supervisor ? Result).mapTo[Int] | |
result = Await.result(future, timeout.duration) | |
log.info("Value Received-> {}", result) | |
log.info("Sending value \"String\", IllegalArgumentException should be thrown! Our Supervisor strategy says Stop !") | |
supervisor ? "Do Something" | |
result = Await.result(future, timeout.duration) | |
log.info("no Value should be Received-> {}", result) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment