Created
August 18, 2012 17:49
-
-
Save derekwyatt/3388701 to your computer and use it in GitHub Desktop.
Shutdown pattern
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Spawn your futures | |
val fs = (1 to 100).map { i => | |
Future { Thread.sleep(i); i } | |
} | |
// Wrap all of the work up into a single | |
// Future | |
val f = Future.sequence(fs) | |
// Wait on it forever - i.e. until it's done | |
Await.result(f, Duration.Inf) | |
// Shut down | |
system.shutdown() |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ProductionReaper extends Reaper { | |
// Shutdown | |
def allSoulsReaped(): Unit = context.system.shutdown() | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{Actor, ActorRef, Terminated} | |
import scala.collection.mutable.ArrayBuffer | |
object Reaper { | |
// Used by others to register an Actor for watching | |
case class WatchMe(ref: ActorRef) | |
} | |
abstract class Reaper extends Actor { | |
import Reaper._ | |
// Keep track of what we're watching | |
val watched = ArrayBuffer.empty[ActorRef] | |
// Derivations need to implement this method. It's the | |
// hook that's called when everything's dead | |
def allSoulsReaped(): Unit | |
// Watch and check for termination | |
final def receive = { | |
case WatchMe(ref) => | |
context.watch(ref) | |
watched += ref | |
case Terminated(ref) => | |
watched -= ref | |
if (watched.isEmpty) allSoulsReaped() | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.{ActorSystem, Props, ActorRef} | |
import akka.testkit.{TestKit, ImplicitSender, TestProbe} | |
import org.scalatest.{WordSpec, BeforeAndAfterAll} | |
import org.scalatest.matchers.MustMatchers | |
// Our test reaper. Sends the snooper a message when all | |
// the souls have been reaped | |
class TestReaper(snooper: ActorRef) extends Reaper { | |
def allSoulsReaped(): Unit = snooper ! "Dead" | |
} | |
class ReaperSpec extends TestKit(ActorSystem("ReaperSpec")) | |
with ImplicitSender | |
with WordSpec | |
with BeforeAndAfterAll | |
with MustMatchers { | |
import Reaper._ | |
override def afterAll() { | |
system.shutdown() | |
} | |
"Reaper" should { | |
"work" in { | |
// Set up some dummy Actors | |
val a = TestProbe() | |
val b = TestProbe() | |
val c = TestProbe() | |
val d = TestProbe() | |
// Build our reaper | |
val reaper = system.actorOf(Props(new TestReaper(testActor))) | |
// Watch a couple | |
reaper ! WatchMe(a.ref) | |
reaper ! WatchMe(d.ref) | |
// Stop them | |
system.stop(a.ref) | |
system.stop(d.ref) | |
// Make sure we've been called | |
expectMsg("Dead") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
What will happen if an uncaught exception is thrown out in the watched actor? It seems the watched actor won't be able to send out a Terminated message to its parent actor, then the ArrayBuffer
Reaper.watched
will never be empty