Last active
December 20, 2015 04:09
-
-
Save ponkotuy/6069151 to your computer and use it in GitHub Desktop.
Akka Actor Sample
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "AkkaSample" | |
organization := "jp.co.datasection" | |
scalaVersion := "2.10.2" | |
libraryDependencies ++= Seq( | |
"com.typesafe.akka" %% "akka-actor" % "2.2.0", | |
"com.typesafe.akka" %% "akka-contrib" % "2.2.0" | |
) | |
resolvers ++= Seq( | |
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/" | |
) | |
scalacOptions ++= Seq( | |
"-deprecation", | |
"-unchecked", | |
"-feature" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import scala.collection.mutable.ArrayBuffer | |
object Reaper { | |
case class WatchMe(ref: ActorRef) | |
} | |
abstract class Reaper extends Actor { | |
import Reaper._ | |
// Keep track of what we're watching | |
val watched = ArrayBuffer.empty[ActorRef] | |
// Derivations need to implement this method. It's the | |
// hook that's called when everything's dead | |
def allSoulsReaped(): Unit | |
// Watch and check for termination | |
final def receive = { | |
case WatchMe(ref) => | |
context.watch(ref) | |
watched += ref | |
case Terminated(ref) => | |
watched -= ref | |
if (watched.isEmpty) allSoulsReaped() | |
} | |
} | |
class ShutdownReaper extends Reaper { | |
// Shutdown | |
def allSoulsReaped() { | |
context.system.shutdown() | |
println("Shutdown from Reaper") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.duration._ | |
import akka.actor._ | |
object ReaperSample { | |
class Printer(header: String) extends Actor { | |
context.setReceiveTimeout(30.milliseconds) | |
def receive = { | |
case ReceiveTimeout => context.stop(self) | |
case x => | |
println(header + x) | |
Thread.sleep(1000) | |
} | |
} | |
def main(args: Array[String]) { | |
val system = ActorSystem() | |
val printerA = system.actorOf(Props(new Printer("a: "))) | |
val printerB = system.actorOf(Props(new Printer("b: "))) | |
val reaper = system.actorOf(Props[ShutdownReaper]) | |
reaper ! Reaper.WatchMe(printerA) | |
reaper ! Reaper.WatchMe(printerB) | |
printerA ! 1 | |
printerB ! 2 | |
printerB ! 3 | |
printerA ! 4 | |
printerA ! 5 | |
// Terminate when output "a: 5" | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.routing._ | |
object RouterSample { | |
class Printer extends Actor { | |
context.setReceiveTimeout(30.milliseconds) | |
def receive= { | |
case ReceiveTimeout => context.stop(self) | |
case x => | |
Thread.sleep(1000) // Should not block normally | |
println(x) | |
} | |
} | |
def main(args: Array[String]) { | |
val system = ActorSystem("sample") | |
val router = system.actorOf(Props[Printer].withRouter(RoundRobinRouter(4))) | |
val reaper = system.actorOf(Props[ShutdownReaper]) | |
reaper ! Reaper.WatchMe(router) | |
(1 to 10) foreach { router ! _ } | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.contrib.throttle._ | |
import akka.contrib.throttle.Throttler._ | |
object ThrottlerSample { | |
class Printer extends Actor { | |
context.setReceiveTimeout(1030.milliseconds) // Require more 1sec for throttler | |
def receive = { | |
case ReceiveTimeout => context.stop(self) | |
case x => println(x) | |
} | |
} | |
def main(args: Array[String]) { | |
val system = ActorSystem("sample") | |
val printer = system.actorOf(Props[Printer]) | |
val throttler = system.actorOf( | |
Props(new TimerBasedThrottler(3 msgsPer (1.seconds))) | |
) | |
throttler ! SetTarget(Some(printer)) | |
// Reaper Settings | |
val reaper = system.actorOf(Props[ShutdownReaper]) | |
reaper ! Reaper.WatchMe(printer) | |
// These three messages will be sent to the printer immediately | |
throttler ! "1" | |
throttler ! "2" | |
throttler ! "3" | |
// These two will wait at least until 1 second has passed | |
throttler ! "4" | |
throttler ! "5" | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment