Last active
December 16, 2015 04:49
-
-
Save ponkotuy/5379540 to your computer and use it in GitHub Desktop.
Akka ActorでPipeline処理
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
import akka.actor._ | |
import akka.pattern.gracefulStop | |
object Main extends App { | |
implicit val duration = 5.seconds | |
val system = ActorSystem() | |
// 初期化時に、データを渡すAcotrRefを引数にして初期化する | |
val ref = { | |
val refA2 = system.actorOf(Props(new A(None)), name="a2") | |
val refB1 = system.actorOf(Props(new B(Some(refA2))), name="b1") | |
system.actorOf(Props(new A(Some(refB1))), name="a1") | |
} | |
// 実際に10個のデータを投げている | |
(1 to 10).foreach(ref ! _.toString) | |
// 終了処理 | |
Await.result(gracefulStop(ref, duration)(system), duration) | |
system.shutdown() | |
} | |
// Actorその1 | |
class A(next: Option[ActorRef])(implicit duration: FiniteDuration) | |
extends PipeActor(next) { | |
override def receive = { | |
case x: String => | |
Thread.sleep(100) | |
println("Receive: %s (%s)".format(x, self.path)) | |
super.receive(x) | |
} | |
} | |
// Actorその2 | |
class B(next: Option[ActorRef])(implicit duration: FiniteDuration) | |
extends PipeActor(next) { | |
override def receive = { | |
case x: String => | |
Thread.sleep(100) | |
println("Receive: %s (%s)".format(x, self.path)) | |
super.receive(x) | |
} | |
} | |
// Pipelineに使うActorの基底クラス。現状だと一直線にしかできない | |
abstract class PipeActor(next: Option[ActorRef])(implicit duration: FiniteDuration) | |
extends Actor { | |
def receive = { | |
// nextのActorRefに処理を投げる | |
case x => | |
println("Pipe") | |
next.foreach { _ ! x } | |
} | |
// stopもnextに投げる | |
override def postStop() { | |
println("Shutdown") | |
next.foreach { it => | |
Await.result(gracefulStop(it, duration)(context.system), duration) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment