Created July 16, 2015 07:26
サンプルコード in "プログラミング言語のパラダイムシフトーScalaから見る関数型と並列性時代の幕開けー"
import akka.routing.{SmallestMailboxPool}
import akka.testkit.{TestKit, ImplicitSender}
import com.typesafe.config.ConfigFactory
import jp.trifort.ifu.StopSystemAfterAll
import org.scalatest.{MustMatchers, WordSpecLike}
import scala.concurrent.{Await, Future}
import akka.util.Timeout
import scala.concurrent.duration._
class CountAActor extends Actor {
var totalA = 0
def receive: Receive = {
case "How many?" => sender ! totalA
case text: String => totalA += text.toUpperCase().count(_ == 'A')
object CountAActor {
def props = Props(new CountAActor)
trait RouterCreator {
def createRouter = SmallestMailboxPool(100).props(CountAActor.props)
class CountARouter extends Actor with RouterCreator {
val countARouter = context.actorOf(createRouter)
def receive: Receive = {
case hm@"How many?" => {
import akka.routing.Broadcast
val reducer = context.actorOf(Reducer.props(sender(), 100))
countARouter.tell(Broadcast(hm), reducer)
case msg => countARouter forward msg
object CountARouter {
def props = Props(new CountARouter)
class Reducer(sendTo: ActorRef, maxCount: Int) extends Actor {
var total = 0; var count = 0
def receive: Receive = {
case sum: Int => {
total += sum; count += 1
if (count == maxCount) {
sendTo ! total
self ! PoisonPill
object Reducer {
def props(sendTo: ActorRef, maxCount: Int) = Props(new Reducer(sendTo, maxCount))
class CrashActor extends Actor with ActorLogging {
def receive: Receive = {
case "Crash!!!" => throw new Exception("crashed!")
override def preStart() {"preStart")
override def preRestart(reason: Throwable, message: Option[Any]) = {"preRestart")
super.preRestart(reason, message)
override def postRestart(reason: Throwable) {"postRestart")
override def postStop() {"postStop")
object CrashActor {
def props = Props(new CrashActor)
class Supervisor extends Actor {
val crashActor = context.actorOf(CrashActor.props)
def receive: Receive = {
case msg => crashActor forward msg
object Supervisor {
def props = Props(new Supervisor)
class SampleTest extends TestKit(ActorSystem("SampleSystem", ConfigFactory.empty()))
with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll {
"Future" must {
"map and flatMap" in {
val futureMessage = Future {
Thread.sleep(1000); 1
}.flatMap(value => Future {
Thread.sleep(1000); value + 1
}).map(s => s"This is a value of future after $s seconds")
Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds")
"for comprehension" in {
val futureMessage = for {
s1 <- Future {
Thread.sleep(1000); 1
s2 <- Future {
Thread.sleep(1000); s1 + 1
} yield s"This is a value of future after $s2 seconds"
Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds")
"parallel collection" must {
"behave same as standard one" in {
val list = (0 to 9999).toList + 1).filter(_ % 2 == 0).fold(0)(_ + _) must be + 1).filter(_ % 2 == 0).fold(0)(_ + _)
"CountAActor" must {
"count A and a" in {
val countAActor = system.actorOf(CountAActor.props, "countAActor")
countAActor ! "na" * 16
countAActor ! "BATMAN!"
countAActor ! "How many?"
"count A and a in parallel" in {
implicit val dispatcher = system.dispatcher
implicit val timeout = Timeout(5 seconds)
import akka.pattern.ask
val countAActor1 = system.actorOf(CountAActor.props, "countAActor1")
val countAActor2 = system.actorOf(CountAActor.props, "countAActor2")
countAActor1 ! "na" * 16
countAActor2 ! "BATMAN!"
val futures = Seq(countAActor1, countAActor2).map(_ ? "How many?").map(_.mapTo[Int])
val result = Future.sequence(futures).map(_.reduce(_ + _))
Await.result(result, 5 seconds) must be(18)
"Router" must {
"route messages" in {
val router = system.actorOf(CountARouter.props, "CountARouter")
Stream.fill(10000)("BATMAN!").foreach(router ! _)
router ! "How many?"
expectMsg(10000 * 2)
"Supervisor" must {
"crush CrashActor" in {
val supervisor = system.actorOf(Supervisor.props, "supervisor")
supervisor ! "Crash!!!"
