Created
July 16, 2015 07:26
-
-
Save TanUkkii007/dacbf45a65492808c53f to your computer and use it in GitHub Desktop.
サンプルコード in "プログラミング言語のパラダイムシフトーScalaから見る関数型と並列性時代の幕開けー http://www.slideshare.net/TanUkkii/functional-and-concurencyinscala"
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.routing.{SmallestMailboxPool} | |
import akka.testkit.{TestKit, ImplicitSender} | |
import com.typesafe.config.ConfigFactory | |
import jp.trifort.ifu.StopSystemAfterAll | |
import org.scalatest.{MustMatchers, WordSpecLike} | |
import scala.concurrent.{Await, Future} | |
import akka.util.Timeout | |
import scala.concurrent.duration._ | |
class CountAActor extends Actor { | |
var totalA = 0 | |
def receive: Receive = { | |
case "How many?" => sender ! totalA | |
case text: String => totalA += text.toUpperCase().count(_ == 'A') | |
} | |
} | |
object CountAActor { | |
def props = Props(new CountAActor) | |
} | |
trait RouterCreator { | |
def createRouter = SmallestMailboxPool(100).props(CountAActor.props) | |
} | |
class CountARouter extends Actor with RouterCreator { | |
val countARouter = context.actorOf(createRouter) | |
def receive: Receive = { | |
case hm@"How many?" => { | |
import akka.routing.Broadcast | |
val reducer = context.actorOf(Reducer.props(sender(), 100)) | |
countARouter.tell(Broadcast(hm), reducer) | |
} | |
case msg => countARouter forward msg | |
} | |
} | |
object CountARouter { | |
def props = Props(new CountARouter) | |
} | |
class Reducer(sendTo: ActorRef, maxCount: Int) extends Actor { | |
var total = 0; var count = 0 | |
def receive: Receive = { | |
case sum: Int => { | |
total += sum; count += 1 | |
if (count == maxCount) { | |
sendTo ! total | |
self ! PoisonPill | |
} | |
} | |
} | |
} | |
object Reducer { | |
def props(sendTo: ActorRef, maxCount: Int) = Props(new Reducer(sendTo, maxCount)) | |
} | |
class CrashActor extends Actor with ActorLogging { | |
def receive: Receive = { | |
case "Crash!!!" => throw new Exception("crashed!") | |
} | |
override def preStart() { | |
log.info("preStart") | |
} | |
override def preRestart(reason: Throwable, message: Option[Any]) = { | |
log.info("preRestart") | |
super.preRestart(reason, message) | |
} | |
override def postRestart(reason: Throwable) { | |
log.info("postRestart") | |
super.postRestart(reason) | |
} | |
override def postStop() { | |
log.info("postStop") | |
} | |
} | |
object CrashActor { | |
def props = Props(new CrashActor) | |
} | |
class Supervisor extends Actor { | |
val crashActor = context.actorOf(CrashActor.props) | |
def receive: Receive = { | |
case msg => crashActor forward msg | |
} | |
} | |
object Supervisor { | |
def props = Props(new Supervisor) | |
} | |
class SampleTest extends TestKit(ActorSystem("SampleSystem", ConfigFactory.empty())) | |
with WordSpecLike with MustMatchers with ImplicitSender with StopSystemAfterAll { | |
"Future" must { | |
import scala.concurrent.ExecutionContext.Implicits.global | |
"map and flatMap" in { | |
val futureMessage = Future { | |
Thread.sleep(1000); 1 | |
}.flatMap(value => Future { | |
Thread.sleep(1000); value + 1 | |
}).map(s => s"This is a value of future after $s seconds") | |
Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds") | |
} | |
"for comprehension" in { | |
val futureMessage = for { | |
s1 <- Future { | |
Thread.sleep(1000); 1 | |
} | |
s2 <- Future { | |
Thread.sleep(1000); s1 + 1 | |
} | |
} yield s"This is a value of future after $s2 seconds" | |
Await.result(futureMessage, 5 seconds) must be("This is a value of future after 2 seconds") | |
} | |
} | |
"parallel collection" must { | |
"behave same as standard one" in { | |
val list = (0 to 9999).toList | |
list.map(_ + 1).filter(_ % 2 == 0).fold(0)(_ + _) must be | |
list.par.map(_ + 1).filter(_ % 2 == 0).fold(0)(_ + _) | |
} | |
} | |
"CountAActor" must { | |
"count A and a" in { | |
val countAActor = system.actorOf(CountAActor.props, "countAActor") | |
countAActor ! "na" * 16 | |
countAActor ! "BATMAN!" | |
countAActor ! "How many?" | |
expectMsg(18) | |
} | |
"count A and a in parallel" in { | |
implicit val dispatcher = system.dispatcher | |
implicit val timeout = Timeout(5 seconds) | |
import akka.pattern.ask | |
val countAActor1 = system.actorOf(CountAActor.props, "countAActor1") | |
val countAActor2 = system.actorOf(CountAActor.props, "countAActor2") | |
countAActor1 ! "na" * 16 | |
countAActor2 ! "BATMAN!" | |
val futures = Seq(countAActor1, countAActor2).map(_ ? "How many?").map(_.mapTo[Int]) | |
val result = Future.sequence(futures).map(_.reduce(_ + _)) | |
Await.result(result, 5 seconds) must be(18) | |
} | |
} | |
"Router" must { | |
"route messages" in { | |
val router = system.actorOf(CountARouter.props, "CountARouter") | |
Stream.fill(10000)("BATMAN!").foreach(router ! _) | |
router ! "How many?" | |
expectMsg(10000 * 2) | |
} | |
} | |
"Supervisor" must { | |
"crush CrashActor" in { | |
val supervisor = system.actorOf(Supervisor.props, "supervisor") | |
supervisor ! "Crash!!!" | |
Thread.sleep(1000) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment