Last active
November 6, 2022 13:22
-
-
Save hanishi/c00e5c16c5375c0285cc08e1ac0efc57 to your computer and use it in GitHub Desktop.
Sample parent-child actors with aggregator for collecting the status of child actors.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import akka.actor.typed.scaladsl.Behaviors | |
import akka.actor.typed.{ActorRef, Behavior} | |
import scala.collection.immutable | |
import scala.concurrent.duration.FiniteDuration | |
import scala.reflect.ClassTag | |
//see https://github.com/akka/akka/blob/v2.7.0/akka-actor-typed-tests/src/test/scala/docs/akka/typed/Aggregator.scala | |
object Aggregator { | |
type Reply = Any | |
def apply[Reply: ClassTag, Aggregate]( | |
sendRequests: => ActorRef[Reply] => Unit, | |
replyTo: ActorRef[Aggregate], | |
aggregateReplies: immutable.IndexedSeq[Reply] => Aggregate, | |
expectedReplies: Int, | |
timeout: FiniteDuration | |
): Behavior[Command] = { | |
Behaviors.setup { context => | |
context.setReceiveTimeout(timeout, ReceiveTimeout) | |
val replyAdapter = context.messageAdapter[Reply](WrappedReply(_)) | |
sendRequests(replyAdapter) | |
def collecting(replies: immutable.IndexedSeq[Reply]): Behavior[Command] = | |
Behaviors.receiveMessagePartial { | |
case WrappedReply(reply: Reply) => | |
context.log.info("reply received") | |
val newReplies = replies :+ reply | |
if (newReplies.size == expectedReplies) { | |
val result = aggregateReplies(newReplies) | |
replyTo ! result | |
Behaviors.stopped | |
} else | |
collecting(newReplies) | |
case ReceiveTimeout => | |
context.log.info("Aggregator timed out!") | |
val aggregate = aggregateReplies(replies) | |
replyTo ! aggregate | |
Behaviors.stopped | |
} | |
collecting(Vector.empty) | |
} | |
} | |
sealed trait Command | |
private case class WrappedReply[R](reply: R) extends Command | |
private case object ReceiveTimeout extends Command | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
app.actors.stash-buffer-size=100 | |
app.actors.ask-timeout=30 | |
app.actors.termination-timeout=1 | |
app.actors.retry-exponent-base=2 | |
# app.actors.retry-count= |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import actors.Child.{INITIALIZING, State} | |
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer, TimerScheduler} | |
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} | |
import akka.pattern.StatusReply | |
import play.api.Configuration | |
import java.time.LocalDateTime | |
import java.time.temporal.ChronoUnit | |
import scala.concurrent.duration.{DurationDouble, DurationInt, FiniteDuration} | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.chaining.scalaUtilChainingOps | |
import scala.util.{Failure, Random, Success} | |
case class Child(id: Int, | |
state: State = INITIALIZING, | |
created: LocalDateTime = LocalDateTime.now(), | |
duration: Long = 0L) { | |
def withDuration = copy(duration = ChronoUnit.SECONDS.between(created, LocalDateTime.now())) | |
def transition(state: State) = copy(state = state) | |
} | |
object Child { | |
def load(id: Int): Future[Child] = Future.successful(Child(id)) | |
sealed trait State | |
sealed trait Command | |
case class Status(replyTo: ActorRef[StatusReply[Child]]) extends Command | |
case class Bye(replyTo: ActorRef[StatusReply[Message]]) extends Command | |
case class Hello(replyTo: ActorRef[StatusReply[Message]]) extends Command | |
case class StartTask(replyTo: ActorRef[StatusReply[Message]]) extends Command | |
private case class CommandSucceeded(hi: Message) extends Command | |
private case class Retry(command: Command, count: Int) extends Command | |
private case class TaskStarted(id: Int) extends Command | |
private case class ExecuteTask(id: Int, replyTo: ActorRef[StatusReply[Message]]) extends Command | |
private case class TaskExecuted(result: Message) extends Command | |
private case class InitializationSucceeded(child: Child) extends Command | |
private case class CommandFailed(throwable: Throwable) extends Command | |
private case class InitializationFailed(id: Int, throwable: Throwable) extends Command | |
private class Actor( | |
context: ActorContext[Command], | |
buffer: StashBuffer[Command], | |
timers: TimerScheduler[Command], | |
config: Configuration | |
)(implicit ec: ExecutionContext) { | |
val EXPONENT_BASE_VALUE: Int = config.getOptional[Int](EXPONENT_BASE).getOrElse(DEFAULT_EXPONENT_BASE) | |
val TIMEOUT_DURATION: FiniteDuration = config.getOptional[Int](TERMINATION_TIME_OUT).getOrElse(DEFAULT_TERMINATION_TIME_OUT).minutes | |
val INITIAL_RETRY_COUNT: Int = config.getOptional[Int](RETRY_COUNT).getOrElse(DEFAULT_RETRY_COUNT) | |
def initializing(): Behavior[Command] = Behaviors.receiveMessage { | |
case InitializationSucceeded(child) => | |
idle(child) | |
case InitializationFailed(id, throwable) => | |
context.log.error(s"Initialization failed for $id", throwable) | |
Behaviors.stopped | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
private def responding( | |
child: Child, | |
replyTo: ActorRef[StatusReply[Message]] | |
): Behavior[Command] = | |
Behaviors.receiveMessagePartial { | |
case Status(replyTo) => | |
replyTo ! StatusReply.Success(child.withDuration) | |
Behaviors.same | |
case CommandSucceeded(message) => | |
replyTo ! StatusReply.Success(message) | |
message match { | |
case GoodBye(_) => | |
Behaviors.stopped | |
case _ => idle(child) | |
} | |
case CommandFailed(throwable) => | |
replyTo ! StatusReply.Error(throwable) | |
idle(child) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
private def runningTask(child: Child, replyTo: ActorRef[StatusReply[Message]]): Behavior[Command] = | |
Behaviors.receiveMessagePartial { | |
case Status(replyTo) => | |
replyTo ! StatusReply.Success(child.withDuration) | |
Behaviors.same | |
case TaskStarted(id) => | |
idle(child, ExecuteTask(id, replyTo)) | |
case TaskExecuted(result) => | |
replyTo ! StatusReply.Success(result) | |
idle(child) | |
case Retry(command, count) => | |
context.log.info(s"$count retries left") | |
idle(child, command, count) | |
case CommandFailed(throwable) => | |
replyTo ! StatusReply.Error(throwable) | |
idle(child) | |
} | |
private def idle(child: Child): Behavior[Command] = { | |
if (timers.isTimerActive(TimeoutKey)) timers.cancel(TimeoutKey) | |
timers.startSingleTimer( | |
TimeoutKey, | |
Terminate, | |
TIMEOUT_DURATION | |
) | |
buffer.unstashAll(active(child.transition(READY))) | |
} | |
private def idle(child: Child, command: Command, count: Int = INITIAL_RETRY_COUNT): Behavior[Command] = { | |
if (timers.isTimerActive(TimeoutKey)) timers.cancel(TimeoutKey) | |
if (count < INITIAL_RETRY_COUNT) { | |
val delay = Math.pow(EXPONENT_BASE_VALUE, INITIAL_RETRY_COUNT - count).seconds | |
context.log.debug(s"retrying $command in $delay seconds") | |
timers.startSingleTimer(command, delay) | |
} else context.self ! command | |
active(child, count) | |
} | |
private def active(child: Child, retryCount: Int = INITIAL_RETRY_COUNT): Behavior[Command] = | |
Behaviors.receiveMessagePartial { | |
case Status(replyTo) => | |
replyTo ! StatusReply.Success(child.withDuration) | |
Behaviors.same | |
case Hello(replyTo) => | |
hello().pipe(context.pipeToSelf) { | |
case Success(hi) => | |
CommandSucceeded(hi) | |
case Failure(throwable) => | |
CommandFailed(throwable) | |
} | |
responding(child.transition(WAITING), replyTo) | |
case command@StartTask(replyTo) => | |
startTask().pipe(context.pipeToSelf) { | |
case Success(id) => | |
TaskStarted(id) | |
case Failure(throwable) => | |
if (retryCount > 0) Retry(command, retryCount - 1) | |
else CommandFailed(throwable) | |
} | |
runningTask(child.transition(RUNNING), replyTo) | |
case command@ExecuteTask(id, replyTo) => | |
executeTask(id).pipe(context.pipeToSelf) { | |
case Success(result) => | |
TaskExecuted(result) | |
case Failure(throwable) => | |
if (retryCount > 0) Retry(command, retryCount - 1) | |
else CommandFailed(throwable) | |
} | |
runningTask(child.transition(RUNNING), replyTo) | |
case Bye(replyTo) => | |
bye().pipe(context.pipeToSelf) { | |
case Success(goodbye) => | |
CommandSucceeded(goodbye) | |
case Failure(throwable) => | |
CommandFailed(throwable) | |
} | |
responding(child.transition(WAITING), replyTo) | |
case Terminate => | |
context.log.debug(s"terminating actor for child: ${child.id}") | |
Behaviors.stopped | |
} | |
private def hello(): Future[Hi] = | |
Future{Hi("Hello my friend!")} | |
private def startTask(): Future[Int] = Future {Random.nextInt(Int.MaxValue)} | |
private def executeTask(id: Int): Future[TaskResult] = | |
Future { | |
Thread.sleep(15000) | |
TaskResult(Random.nextLong(Long.MaxValue)) | |
} | |
// Future.failed(new RuntimeException("BOOM!") | |
private def bye(): Future[GoodBye] = Future{GoodBye("Bye my friend!")} | |
} | |
case object INITIALIZING extends State { | |
override def toString: String = this.productPrefix | |
} | |
case object READY extends State { | |
override def toString: String = this.productPrefix | |
} | |
case object RUNNING extends State { | |
override def toString: String = this.productPrefix | |
} | |
case object WAITING extends State { | |
override def toString: String = this.productPrefix | |
} | |
object Actor { | |
def apply(id: Int, configuration: Configuration)(implicit ec: ExecutionContext): Behavior[Command] = | |
Behaviors | |
.supervise[Command]( | |
Behaviors.withStash(configuration.getOptional[Int](STASH_BUFFER_SIZE) | |
.getOrElse(DEFAULT_STASH_BUFFER_SIZE)) { | |
buffer => | |
Behaviors.setup { context => | |
Behaviors.withTimers { timers => | |
load(id).pipe(context.pipeToSelf) { | |
case Success(child) => InitializationSucceeded(child) | |
case Failure(throwable) => | |
InitializationFailed(id, throwable) | |
} | |
new Actor( | |
context, | |
buffer, | |
timers, | |
configuration | |
).initializing() | |
} | |
} | |
} | |
) | |
.onFailure(SupervisorStrategy.stop) | |
} | |
private case object Terminate extends Command | |
private case object TimeoutKey | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import actors.Parent | |
import com.google.inject.AbstractModule | |
import play.api.libs.concurrent.AkkaGuiceSupport | |
class Module extends AbstractModule with AkkaGuiceSupport { | |
override def configure(): Unit = | |
bindTypedActor(Parent.Actor, "parent") | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package object actors { | |
val STASH_BUFFER_SIZE = "app.actors.stash-buffer-size" | |
val ASK_TIME_OUT = "app.actors.ask-timeout" | |
val DEFAULT_ASK_TIMEOUT = 30 | |
val DEFAULT_STASH_BUFFER_SIZE = 100 | |
val TERMINATION_TIME_OUT = "app.actors.termination-timeout" | |
val DEFAULT_TERMINATION_TIME_OUT = 1 | |
val EXPONENT_BASE = "app.actors.retry-exponent-base" | |
val DEFAULT_EXPONENT_BASE = 2 | |
val RETRY_COUNT = "app.actors.retry-count" | |
val DEFAULT_RETRY_COUNT = 3 | |
trait Message | |
case class Hi(message: String) extends Message | |
case class GoodBye(message: String) extends Message | |
case class TaskResult(result: Long) extends Message | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package actors | |
import actors.Parent.Command | |
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer} | |
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} | |
import akka.pattern.StatusReply | |
import akka.util.Timeout | |
import com.google.inject.Provides | |
import play.api.Configuration | |
import play.api.libs.concurrent.ActorModule | |
import scala.collection.Seq | |
import scala.concurrent.duration.{DurationInt, FiniteDuration} | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.chaining.scalaUtilChainingOps | |
import scala.util.{Failure, Random, Success} | |
case class Parent(children: Seq[Child]) | |
extends Command | |
object Parent { | |
val aggregationTimeout: FiniteDuration = 1.second | |
def initialize(numberOfChildren: Int): Future[Seq[Int]] = Future.successful(Seq.fill(numberOfChildren)(Random.nextInt(Int.MaxValue))) | |
sealed trait Command | |
sealed trait Request[T] extends Command { | |
def id: Int | |
def replyTo: ActorRef[StatusReply[T]] | |
} | |
case class Aggregated(children: Seq[Child], replyTo: ActorRef[StatusReply[StatusResult]]) extends Command | |
case class StatusResult(children: Seq[Child]) | |
case class Status(id: Int, replyTo: ActorRef[StatusReply[StatusResult]]) extends Request[StatusResult] | |
case class StartTask(id: Int, replyTo: ActorRef[StatusReply[Message]]) extends Request[Message] | |
case class Hello(id: Int, replyTo: ActorRef[StatusReply[Message]]) extends Request[Message] | |
case class Bye(id: Int, replyTo: ActorRef[StatusReply[Message]]) extends Request[Message] | |
private case class InitializationFailed(throwable: Throwable) extends Command | |
private case class InitializationSucceeded(ids: Seq[Int]) extends Command | |
private case class ChildConfirmed[T](child: Child, request: Request[T]) extends Command | |
private case class ChildTerminated(id: Int) extends Command | |
private case class RequestSucceeded(response: Message, replyTo: ActorRef[StatusReply[Message]]) extends Command | |
private case class RequestFailed[T](throwable: Throwable, request: Request[T]) extends Command | |
private class Actor(configuration: Configuration, | |
context: ActorContext[Command], | |
buffer: StashBuffer[Command])(implicit timeout: Timeout, ec: ExecutionContext) { | |
def initializing(actorRefs: Map[Int, ActorRef[Child.Command]] = Map.empty): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case InitializationSucceeded(ids: Seq[Int]) => | |
buffer.unstashAll(active(ids.foldLeft(actorRefs) { (actorRefs, id) => | |
actorRefs + (id -> spawnChild(id)) | |
})) | |
case InitializationFailed(throwable) => | |
context.log.error("Initialization Failed") | |
throw throwable | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
def idle(actorRefs: Map[Int, ActorRef[Child.Command]], actorRef: ActorRef[Child.Command]): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case Status(id, replyTo) => | |
val numberOfChildren = actorRefs.size | |
if (numberOfChildren > 0) | |
context.spawnAnonymous( | |
Aggregator[StatusReply[Child], Aggregated]( | |
self => actorRefs.values.foreach(child => child ! Child.Status(self)), | |
context.self, | |
replies => Aggregated(replies.map(_.getValue), replyTo), | |
numberOfChildren, | |
aggregationTimeout | |
) | |
) | |
else context.self ! Aggregated(Seq.empty, replyTo) | |
Behaviors.same | |
case Aggregated(result, replyTo) => | |
replyTo ! StatusReply.Success(StatusResult(result)) | |
Behaviors.same | |
case ChildConfirmed(child@Child(id, _, _, _), request) => | |
context.log.info(s"created $child") | |
context.self ! request | |
idle(actorRefs + (id -> actorRef)) | |
case RequestSucceeded(value, replyTo) => | |
replyTo ! StatusReply.Success(value) | |
idle(actorRefs) | |
case RequestFailed(throwable, request) => | |
context.stop(actorRef) | |
request.replyTo ! StatusReply.Error(throwable) | |
idle(actorRefs - request.id) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
def active(actorRefs: Map[Int, ActorRef[Child.Command]]): Behavior[Command] = | |
Behaviors.receiveMessagePartial { | |
case Status(id, replyTo) => | |
val numberOfChildren = actorRefs.size | |
if (numberOfChildren > 0) | |
context.spawnAnonymous( | |
Aggregator[StatusReply[Child], Aggregated]( | |
self => actorRefs.values.foreach(child => child ! Child.Status(self)), | |
context.self, | |
replies => Aggregated(replies.map(_.getValue), replyTo), | |
numberOfChildren, | |
aggregationTimeout | |
) | |
) | |
else context.self ! Aggregated(Seq.empty, replyTo) | |
Behaviors.same | |
case Aggregated(result, replyTo) => | |
replyTo ! StatusReply.Success(StatusResult(result)) | |
Behaviors.same | |
case request@Hello(id, replyTo) => | |
context.log.debug(s"hello received for child: $id") | |
handleRequest(request, actorRefs) { child => | |
context.askWithStatus(child, Child.Hello) { | |
case Success(message) => | |
RequestSucceeded(message, replyTo) | |
case Failure(throwable) => | |
RequestFailed(throwable, request) | |
} | |
idle(actorRefs, child) | |
} | |
case request@Bye(id, replyTo) => | |
context.log.debug(s"bye received for child: $id") | |
handleRequest(request, actorRefs) { child => | |
context.askWithStatus(child, Child.Bye) { | |
case Success(message) => | |
RequestSucceeded(message, replyTo) | |
case Failure(throwable) => | |
RequestFailed(throwable, request) | |
} | |
idle(actorRefs, child) | |
} | |
case request@StartTask(id, replyTo) => | |
context.log.debug(s"start task received for child: $id") | |
handleRequest(request, actorRefs) { child => | |
context.askWithStatus(child, Child.StartTask) { | |
case Success(message) => | |
RequestSucceeded(message, replyTo) | |
case Failure(throwable) => | |
RequestFailed(throwable, request) | |
} | |
idle(actorRefs, child) | |
} | |
case ChildTerminated(id) => | |
context.log.info(s"child with $id terminated") | |
idle(actorRefs - id) | |
} | |
def spawnChild(id: Int): ActorRef[Child.Command] = { | |
val actorRef = context.spawnAnonymous(Child.Actor(id, configuration)) | |
context.watchWith(actorRef, ChildTerminated(id)) | |
actorRef | |
} | |
def handleRequest[T](request: Request[T], actorRefs: Map[Int, ActorRef[Child.Command]])( | |
behavior: ActorRef[Child.Command] => Behavior[Command] | |
): Behavior[Command] = | |
actorRefs | |
.get(request.id) | |
.fold(loadChild[T](request, actorRefs))(behavior) | |
private def idle(actorRefs: Map[Int, ActorRef[Child.Command]]): Behavior[Command] = { | |
context.log.info(s"current children: ${actorRefs}") | |
buffer.unstashAll(active(actorRefs)) | |
} | |
private def loadChild[T](request: Request[T], actorRefs: Map[Int, ActorRef[Child.Command]]): Behavior[Command] = { | |
context.log.info(s"creating child for ${request.id}") | |
val child = context.spawnAnonymous(Child.Actor(request.id, configuration)) | |
context.watchWith(child, ChildTerminated(request.id)) | |
confirm(request, actorRefs, child) | |
} | |
private def confirm[T](request: Request[T], | |
actorRefs: Map[Int, ActorRef[Child.Command]], | |
actorRef: ActorRef[Child.Command]): Behavior[Command] = { | |
context.askWithStatus(actorRef, Child.Status) { | |
case Success(child) => | |
ChildConfirmed(child, request) | |
case Failure(throwable) => | |
RequestFailed(throwable, request) | |
} | |
idle(actorRefs, actorRef) | |
} | |
} | |
object Actor extends ActorModule { | |
override type Message = Command | |
@Provides def apply(configuration: Configuration)(implicit ec: ExecutionContext): Behavior[Command] = Behaviors | |
.supervise[Command]( | |
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) { | |
buffer => | |
Behaviors | |
.setup { context => | |
Behaviors.logMessages { | |
implicit val timeout: Timeout = | |
configuration.getOptional[Int](ASK_TIME_OUT).getOrElse(DEFAULT_ASK_TIMEOUT).seconds | |
initialize(4).pipe(context.pipeToSelf) { | |
case Success(ids) => | |
InitializationSucceeded(ids) | |
case Failure(throwable) => | |
InitializationFailed(throwable) | |
} | |
new Actor(configuration, context, buffer).initializing() | |
} | |
} | |
} | |
).onFailure(SupervisorStrategy.restart) | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package models | |
import actors.Parent._ | |
import actors.{ASK_TIME_OUT, Message} | |
import akka.actor.typed.scaladsl.AskPattern.Askable | |
import akka.actor.typed.{ActorRef, Scheduler} | |
import akka.util.Timeout | |
import play.api.Configuration | |
import javax.inject.Inject | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.concurrent.duration.DurationInt | |
class ParentChild @Inject()(configuration: Configuration, | |
parent: ActorRef[Command])(implicit scheduler: Scheduler, ex: ExecutionContext) { | |
implicit val timeout: Timeout = configuration.get[Int](ASK_TIME_OUT).seconds | |
def hello(id: Int): Future[Message] = parent.askWithStatus[Message](Hello(id, _)) | |
def bye(id: Int): Future[Message] = parent.askWithStatus(Bye(id, _)) | |
def startTask(id: Int): Future[Message] = parent.askWithStatus(StartTask(id, _)) | |
def status(id: Int): Future[StatusResult] = parent.askWithStatus(Status(id, _)) | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers | |
import actors.Parent.StatusResult | |
import com.google.inject.Singleton | |
import actors.{Child, GoodBye, Hi, TaskResult} | |
import akka.actor.typed.Scheduler | |
import play.api.libs.json.Json | |
import play.api.mvc.{AbstractController, Action, AnyContent, ControllerComponents} | |
import usecase.ParentChild | |
import java.time.format.DateTimeFormatter | |
import javax.inject.Inject | |
import scala.concurrent.ExecutionContext | |
@Singleton class ParentChildController @Inject()(cc: ControllerComponents, parentChild: ParentChild) | |
(implicit ec: ExecutionContext, scheduler: Scheduler) | |
extends AbstractController(cc) { | |
val dateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss") | |
def hello(id: Int): Action[AnyContent] = Action.async { | |
parentChild.hello(id).map { | |
case Hi(message) => Ok(Json.toJson(Json.obj("id" -> id, "message" -> message))) | |
}.recover { | |
case e: Throwable => | |
e.printStackTrace() | |
InternalServerError(Json.toJson(e.getMessage)) | |
} | |
} | |
def bye(id: Int): Action[AnyContent] = Action.async { | |
parentChild.bye(id).map { | |
case GoodBye(message) => Ok(Json.toJson(Json.obj("id" -> id, "message" -> message))) | |
}.recover { | |
case e: Throwable => | |
e.printStackTrace() | |
InternalServerError(Json.toJson(e.getMessage)) | |
} | |
} | |
def startTask(id: Int): Action[AnyContent] = Action.async { | |
parentChild.startTask(id).map { | |
case TaskResult(result) => Ok(Json.toJson(Json.obj("id" -> id, "result" -> result))) | |
}.recover { | |
case e: Throwable => | |
e.printStackTrace() | |
InternalServerError(Json.toJson(e.getMessage)) | |
} | |
} | |
def status(id: Int) = Action.async { | |
// id is not used | |
parentChild.status(id).map{ | |
case StatusResult(children) => | |
Ok(Json.toJson(children.map(child => | |
Json.obj("id" -> child.id, "created" -> child.created.format(dateTimeFormatter), | |
"duration" -> s"${child.duration} seconds", | |
"state" -> child.state.toString)))) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
GET /parent_child/:id/hello controllers.ParentChildController.hello(id: Int) | |
GET /parent_child/:id/bye controllers.ParentChildController.bye(id: Int) | |
GET /parent_child/:id/startTask controllers.ParentChildController.startTask(id: Int) | |
GET /parent_child/:id/status controllers.ParentChildController.status(id: Int) // id not used |
Author
hanishi
commented
Nov 4, 2022
•
- it demonstrates a sample Parent-Child structure.
- The parent uses Aggregator to collect the status of the children which have been spawned.
- Child actors terminate themselves when they have not been used for a while, which is configurable
- Child actors can retry the received request using exponential backoff upon failing to succeed.
Where Future.successful
is used should be replaced with your actual implementation that runs on a different thread. Otherwise, the method will be executed on the calling thread, blocking the actor from transitioning to its subsequent behavior until the method returns
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment