Skip to content

Instantly share code, notes, and snippets.

@markhamstra
Created April 29, 2014 20:38
Show Gist options
  • Save markhamstra/d112d3064ea9755a4502 to your computer and use it in GitHub Desktop.
Save markhamstra/d112d3064ea9755a4502 to your computer and use it in GitHub Desktop.
Scheduler exception handling
package sample.hello
import akka.actor.Actor
object Greeter {
case object Greet
case object Done
}
class Greeter extends Actor {
def receive = {
case Greeter.Greet => // throw new RuntimeException("Greeter Exception")
println("Hello World!")
sender ! Greeter.Done
}
}
package sample.hello
import akka.actor.Actor
import akka.actor.Props
class HelloWorld extends Actor {
override def preStart(): Unit = {
// create the greeter actor
val greeter = context.actorOf(Props[Greeter], "greeter")
// tell it to perform the greeting
greeter ! Greeter.Greet
}
def receive = {
// when the greeter is done, stop this actor and with it the application
case Greeter.Done => Thread.sleep(10000); context.stop(self)
}
}
package sample.hello
import akka.actor.TestActorSystem
import akka.actor.Props
import akka.actor.ActorRef
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.Terminated
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
object Main {
Thread.setDefaultUncaughtExceptionHandler(
new Thread.UncaughtExceptionHandler {
override def uncaughtException(thread: Thread, exception: Throwable) {
println(s"\n\n\n*** Default Uncaught Exception Handler ***\n$exception\n\n")
}
}
)
def main(args: Array[String]): Unit = {
val system = TestActorSystem("Hello")
val a = system.actorOf(Props[HelloWorld], "helloWorld")
system.actorOf(Props(classOf[Terminator], a), "terminator")
//var foo = "foo"
system.scheduler.schedule(0.seconds, 1.seconds) {
//if (foo == "foo") throw new RuntimeException("Scheduled Exception")
val event = math.random
if (event < 0.25) throw new RuntimeException("Scheduled Exception")
else println(s"\nScheduled Event\n")
}
}
class Terminator(ref: ActorRef) extends Actor with ActorLogging {
context watch ref
def receive = {
case Terminated(_) =>
log.info("{} has terminated, shutting down system", ref.path)
context.system.shutdown()
}
}
}
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
// Must be in akka.actor package as ActorSystemImpl is protected[akka].
package akka.actor
import scala.util.control.{ControlThrowable, NonFatal}
import com.typesafe.config.{Config, ConfigFactory}
/**
* An akka.actor.ActorSystem which refuses to shut down in the event of a fatal exception
* This is necessary as Spark Executors are allowed to recover from fatal exceptions
* (see org.apache.spark.executor.Executor)
*/
object TestActorSystem {
def apply(name: String): ActorSystem = {
val cl = ActorSystem.findClassLoader()
val config = ConfigFactory.load(cl)
apply(name, config, cl)
}
def apply(name: String, config: Config): ActorSystem =
apply(name, config, ActorSystem.findClassLoader())
def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem =
new TestActorSystemImpl(name, config, classLoader).start()
}
private[akka] class TestActorSystemImpl(
override val name: String,
applicationConfig: Config,
classLoader: ClassLoader)
extends ActorSystemImpl(name, applicationConfig, classLoader) {
protected override def uncaughtExceptionHandler: Thread.UncaughtExceptionHandler = {
val fallbackHandler = super.uncaughtExceptionHandler
new Thread.UncaughtExceptionHandler() {
def uncaughtException(thread: Thread, cause: Throwable): Unit = {
println("\n\n\n*** Uncaught Exception Handler ***\n\n\n")
}
}
}
def isFatalError(e: Throwable): Boolean = {
e match {
case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable =>
false
case _ =>
true
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment