Skip to content

Instantly share code, notes, and snippets.

@jasonqu
Last active March 16, 2017 03:38
Show Gist options
  • Save jasonqu/392b1aedddf34290e4af94fb16b746c9 to your computer and use it in GitHub Desktop.
Save jasonqu/392b1aedddf34290e4af94fb16b746c9 to your computer and use it in GitHub Desktop.
supervisor strategy from [Akka Essentials](https://github.com/write2munish/Akka-Essentials)
package akka.book.essence.chap6.AllForOne
import akka.actor.SupervisorStrategy._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
case object Result
class WorkerActor extends Actor with ActorLogging {
var state: Int = 0
override def preStart() {
log.info("Starting WorkerActor instance hashcode # {}", this.hashCode())
}
override def postStop() {
log.info("Stopping WorkerActor instance hashcode # {}", this.hashCode())
}
def receive: Receive = {
case value: Int =>
if (value <= 0)
throw new ArithmeticException("Number equal or less than zero")
else
state = value
case Result =>
sender ! state
case ex: NullPointerException =>
throw new NullPointerException("Null Value Passed")
case _ =>
throw new IllegalArgumentException("Wrong Argument")
}
}
class SupervisorActor extends Actor with ActorLogging {
val workerActor1 = context.actorOf(Props[WorkerActor],
name = "workerActor1")
val workerActor2 = context.actorOf(Props[WorkerActor],
name = "workerActor2")
override val supervisorStrategy = AllForOneStrategy(
maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
case e: ArithmeticException =>
// todo 不打印就没有错误消息
//log.error(e, "before resume")
Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
def receive = {
case Result =>
workerActor1.tell(Result, sender)
case msg =>
workerActor1 ! msg
}
}
object HelloAkkaScala extends App {
val system = ActorSystem("faultTolerance")
val log = system.log
val originalValue: Int = 0
val supervisor = system.actorOf(Props[SupervisorActor], name = "supervisor")
log.info("Sending value 8, no exceptions should be thrown! ")
var mesg: Int = 8
supervisor ! mesg
implicit val timeout = Timeout(1 seconds)
var future = (supervisor ? Result).mapTo[Int]
var result = Await.result(future, timeout.duration)
log.info("Value Received -> {}", result)
log.info("Sending value -8, ArithmeticException should be thrown! Our Supervisor strategy says resume!")
mesg = -8
supervisor ! mesg
future = (supervisor ? Result).mapTo[Int]
result = Await.result(future, timeout.duration)
log.info("Value Received-> {}", result)
log.info("Sending value null, NullPointerException should be thrown! Our Supervisor strategy says restart !")
supervisor ! new NullPointerException
future = (supervisor ? Result).mapTo[Int]
result = Await.result(future, timeout.duration)
log.info("Value Received-> {}", result)
log.info("Sending value \"String\", IllegalArgumentException should be thrown! Our Supervisor strategy says Stop !")
supervisor ? "Do Something"
result = Await.result(future, timeout.duration)
println(s"result : $result")
log.info("no Value should be Received-> {}", result)
}
package akka.book.essence.chap6.OneForOne
import akka.actor.SupervisorStrategy._
import akka.actor._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
case object Result
class WorkerActor extends Actor with ActorLogging {
var state: Int = 0
override def preStart() {
log.info("Starting WorkerActor instance hashcode # {}", this.hashCode())
}
override def postStop() {
log.info("Stopping WorkerActor instance hashcode # {}", this.hashCode())
}
def receive: Receive = {
case value: Int =>
if (value <= 0)
throw new ArithmeticException("Number equal or less than zero")
else
state = value
case Result =>
sender ! state
case ex: NullPointerException =>
throw new NullPointerException("Null Value Passed")
case _ =>
throw new IllegalArgumentException("Wrong Argument")
}
}
class SupervisorActor extends Actor with ActorLogging {
val childActor = context.actorOf(Props[WorkerActor], name = "workerActor")
override val supervisorStrategy = OneForOneStrategy(
maxNrOfRetries = 10, withinTimeRange = 10 seconds) {
case _: ArithmeticException => Resume
case _: NullPointerException => Restart
case _: IllegalArgumentException => Stop
case _: Exception => Escalate
}
def receive = {
case Result =>
childActor.tell(Result, sender)
case msg =>
childActor ! msg
}
}
object HelloAkkaScala extends App {
val system = ActorSystem("faultTolerance")
val log = system.log
val originalValue: Int = 0
val supervisor = system.actorOf(Props[SupervisorActor], name = "supervisor")
log.info("Sending value 8, no exceptions should be thrown! ")
var mesg: Int = 8
supervisor ! mesg
implicit val timeout = Timeout(5 seconds)
var future = (supervisor ? Result).mapTo[Int]
var result = Await.result(future, timeout.duration)
log.info("Value Received -> {}", result)
log.info("Sending value -8, ArithmeticException should be thrown! Our Supervisor strategy says resume!")
mesg = -8
supervisor ! mesg
future = (supervisor ? Result).mapTo[Int]
result = Await.result(future, timeout.duration)
log.info("Value Received-> {}", result)
log.info("Sending value null, NullPointerException should be thrown! Our Supervisor strategy says restart !")
supervisor ! new NullPointerException
future = (supervisor ? Result).mapTo[Int]
result = Await.result(future, timeout.duration)
log.info("Value Received-> {}", result)
log.info("Sending value \"String\", IllegalArgumentException should be thrown! Our Supervisor strategy says Stop !")
supervisor ? "Do Something"
result = Await.result(future, timeout.duration)
log.info("no Value should be Received-> {}", result)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment