Created
May 27, 2020 06:06
-
-
Save brianhsu/7ec5adabaae8a45e08a0fcf18ecdffa0 to your computer and use it in GitHub Desktop.
akka.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.example | |
import akka.actor.typed.{ActorRef, ActorSystem, Behavior, SupervisorStrategy, Terminated} | |
import akka.actor.typed.scaladsl.{Behaviors, PoolRouter, Routers} | |
import com.example.ParentActor._ | |
import com.example.Transformer.{ConvertPlease, GracefulShutdown} | |
object Transformer { | |
sealed trait Request | |
case class ConvertPlease(str: String, replyTo: ActorRef[DispatcherMessage]) extends Request | |
case object GracefulShutdown extends Request | |
def apply(): Behavior[Request] = | |
Behaviors.setup { context => | |
Behaviors.receiveMessage { | |
case ConvertPlease(str, replyTo) => | |
Thread.sleep(scala.util.Random.between(1000, 5000)) | |
replyTo ! ConvertedLine(str + s".....converted") | |
Behaviors.same | |
case GracefulShutdown => | |
println("===> Got GracefulShutdown") | |
Behaviors.stopped { () => | |
println("====> Cleanup") | |
} | |
} | |
} | |
} | |
object ParentActor { | |
sealed trait DispatcherMessage | |
case class Line(line: String) extends DispatcherMessage | |
case class ConvertedLine(line: String) extends DispatcherMessage | |
case object End extends DispatcherMessage | |
def apply(): Behavior[DispatcherMessage] = | |
Behaviors.setup { context => | |
val pool: PoolRouter[Transformer.Request] = Routers.pool(poolSize = 4)( | |
Behaviors.supervise(Transformer()) | |
.onFailure[Exception](SupervisorStrategy.restart)) | |
val router = context.spawn(pool, "worker-pool") | |
Behaviors.receiveMessage { | |
case Line(line) => | |
router ! ConvertPlease(line, context.self) | |
Behaviors.same | |
case ConvertedLine(line) => | |
println("===> converted: " + line) | |
Behaviors.same | |
case End => | |
context.children.foreach(x => println(s"End: $x")) | |
// How to stop all actors in router, and wait all converted line been collected? | |
// The following does not work, it will only send message to ONE Transform actor, isntead of all. | |
router ! GracefulShutdown | |
Behaviors.same | |
} | |
} | |
} | |
object AkkaQuickstart { | |
def main(args: Array[String]) { | |
val greeterMain: ActorSystem[DispatcherMessage] = ActorSystem(ParentActor(), "AkkaQuickStart") | |
greeterMain ! Line("data 1") | |
greeterMain ! Line("data 2") | |
greeterMain ! Line("data 3") | |
greeterMain ! Line("data 4") | |
greeterMain ! Line("data 5") | |
greeterMain ! Line("data 6") | |
greeterMain ! Line("data 7") | |
greeterMain ! Line("data 8") | |
greeterMain ! End | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment