Last active
November 20, 2023 09:18
-
-
Save hanishi/239036c97225d149039216fe68985781 to your computer and use it in GitHub Desktop.
Parent Child using Akka Typed
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import akka.Done | |
import akka.actor.typed.scaladsl.{ | |
ActorContext, | |
Behaviors, | |
StashBuffer, | |
TimerScheduler | |
} | |
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} | |
import akka.pattern.StatusReply | |
import play.api.Configuration | |
import scala.concurrent.duration.{DurationInt, FiniteDuration} | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.chaining.scalaUtilChainingOps | |
import scala.util.{Failure, Success} | |
case class Child(id: String) | |
object Child { | |
def load(id: String): Future[Child] = Future.successful(Child(id)) | |
sealed trait Command | |
case class Get(replyTo: ActorRef[StatusReply[Child]]) extends Command | |
case class Delete(replyTo: ActorRef[StatusReply[Child]]) extends Command | |
case class Hello( | |
replyTo: ActorRef[StatusReply[Hi]] | |
) extends Command | |
private case class HelloResponded(hi: Hi) extends Command | |
private case class Loaded(child: Child) extends Command | |
private case class CommandFailed(throwable: Throwable) extends Command | |
private case class InitializationFailed(throwable: Throwable) extends Command | |
private class Actor( | |
context: ActorContext[Command], | |
buffer: StashBuffer[Command], | |
timers: TimerScheduler[Command], | |
idleTimeout: FiniteDuration | |
) { | |
def initializing(): Behavior[Command] = Behaviors.receiveMessage { | |
case Loaded(child) => | |
idle(child) | |
case InitializationFailed(throwable) => | |
context.log.error("Initialization failed.", throwable) | |
Behaviors.stopped | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
private def greeting( | |
child: Child, | |
replyTo: ActorRef[StatusReply[Hi]] | |
): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case HelloResponded(hi) => | |
replyTo ! StatusReply.Success(hi) | |
idle(child) | |
case CommandFailed(throwable) => | |
replyTo ! StatusReply.Error(throwable) | |
idle(child) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
private def deleting( | |
child: Child, | |
replyTo: ActorRef[StatusReply[Child]] | |
): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case Deleted => | |
replyTo ! StatusReply.Success(child) | |
Behaviors.stopped | |
case CommandFailed(throwable) => | |
replyTo ! StatusReply.Error(throwable) | |
idle(child) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
private def idle(child: Child): Behavior[Command] = { | |
if (timers.isTimerActive(TimeoutKey)) timers.cancel(TimeoutKey) | |
timers.startSingleTimer( | |
TimeoutKey, | |
Unload, | |
idleTimeout | |
) | |
buffer.unstashAll(active(child)) | |
} | |
private def active(child: Child): Behavior[Command] = | |
Behaviors.receiveMessagePartial { | |
case Get(replyTo) => | |
replyTo ! StatusReply.Success(child) | |
Behaviors.same | |
case Hello(replyTo) => | |
hello().pipe(context.pipeToSelf) { | |
case Success(hi) => | |
HelloResponded(hi) | |
case Failure(throwable) => | |
CommandFailed(throwable) | |
} | |
greeting(child, replyTo) | |
case Delete(replyTo) => | |
delete().pipe(context.pipeToSelf) { | |
case Success(_) => | |
Deleted | |
case Failure(throwable) => | |
CommandFailed(throwable) | |
} | |
deleting(child, replyTo) | |
case Unload => | |
context.log.info(s"unloading $child") | |
Behaviors.stopped | |
} | |
private def hello(): Future[Hi] = | |
Future.successful(Hi("Hello my friend!")) | |
private def delete(): Future[Done] = Future.successful(Done) | |
} | |
case object Deleted extends Command | |
case object Unload extends Command | |
object Actor { | |
def apply(id: String, configuration: Configuration)(implicit | |
ec: ExecutionContext | |
): Behavior[Command] = | |
Behaviors | |
.supervise[Command]( | |
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) { | |
buffer => | |
Behaviors.setup { context => | |
Behaviors.withTimers { timers => | |
load(id).pipe(context.pipeToSelf) { | |
case Success(child) => Loaded(child) | |
case Failure(throwable) => | |
InitializationFailed(throwable) | |
} | |
new Actor( | |
context, | |
buffer, | |
timers, | |
configuration.get[Int](IDLE_TIMEOUT).minutes | |
).initializing() | |
} | |
} | |
} | |
) | |
.onFailure(SupervisorStrategy.stop) | |
} | |
private case object TimeoutKey | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
case class Hi(message: String) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class Module extends AbstractModule with AkkaGuiceSupport { | |
override def configure(): Unit = { | |
bindTypedActor(Parent.Actor, "parentActor") | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package object models { | |
val STASH_BUFFER_SIZE = "actor.stash-buffer-size" | |
val TIMEOUT = "actor.future-timeout" | |
val IDLE_TIMEOUT = "actor.idle-timeout" | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import akka.actor.typed.scaladsl.{ActorContext, Behaviors, StashBuffer} | |
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy} | |
import akka.pattern.StatusReply | |
import akka.util.Timeout | |
import com.google.inject.Provides | |
import models.Parent.Command | |
import play.api.Configuration | |
import play.api.libs.concurrent.ActorModule | |
import scala.concurrent.duration.DurationInt | |
import scala.concurrent.{ExecutionContext, Future} | |
import scala.util.chaining.scalaUtilChainingOps | |
import scala.util.{Failure, Success} | |
case class Parent(children: Map[String, ActorRef[Child.Command]] = Map.empty) | |
extends Command | |
object Parent { | |
def initialize: Future[String] = Future.successful("Initialized OK") | |
sealed trait Command | |
sealed trait Request[A] extends Command { | |
def id: String | |
def replyTo: ActorRef[StatusReply[A]] | |
} | |
case class CreateChild( | |
id: String, | |
replyTo: ActorRef[StatusReply[Child]] | |
) extends Request[Child] | |
case class DeleteChild( | |
id: String, | |
replyTo: ActorRef[StatusReply[Child]] | |
) extends Request[Child] | |
case class Hello( | |
id: String, | |
replyTo: ActorRef[StatusReply[Hi]] | |
) extends Request[Hi] | |
private case class HiReceived( | |
hi: Hi, | |
replyTo: ActorRef[StatusReply[Hi]] | |
) extends Command | |
private final case class Initialized(message: String) extends Command | |
private final case class InitializationFailed( | |
throwable: Throwable | |
) extends Command | |
private final case class RequestFailed[A]( | |
throwable: Throwable, | |
request: Request[A] | |
) extends Command | |
private case class ChildDeleted( | |
child: Child, | |
replyTo: ActorRef[StatusReply[Child]] | |
) extends Command | |
private case class ChildLoaded[A]( | |
child: Child, | |
request: Request[A] | |
) extends Command | |
private case class ChildTerminated(id: String) extends Command | |
private class Actor( | |
configuration: Configuration, | |
context: ActorContext[Command], | |
buffer: StashBuffer[Command] | |
)(implicit timeout: Timeout, ec: ExecutionContext) { | |
def initializing(): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case Initialized(message) => | |
context.log.info(message) | |
idle() | |
case InitializationFailed(throwable) => | |
context.log.error("Initialization Failed") | |
throw throwable | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
def idle( | |
parent: Parent, | |
actorRef: ActorRef[Child.Command] | |
): Behavior[Command] = | |
Behaviors.receiveMessage { | |
case ChildLoaded(child @ Child(id), request) => | |
if (!parent.children.contains(id)) { | |
context.log.info(s"created $child") | |
context.self ! request | |
idle(parent.copy(children = parent.children + (id -> actorRef))) | |
} else { | |
request.replyTo ! StatusReply.Success(child) | |
idle(parent) | |
} | |
case HiReceived(value, replyTo) => | |
replyTo ! StatusReply.Success(value) | |
idle(parent) | |
case ChildDeleted(child @ Child(id), replyTo) => | |
context.log.info(s"deleted child with $id") | |
context.stop(actorRef) | |
replyTo ! StatusReply.Success(child) | |
idle(parent.copy(children = parent.children - id)) | |
case RequestFailed(throwable, request) => | |
context.log.error(s"failed to load child for ${request.id}") | |
context.stop(actorRef) | |
request.replyTo ! StatusReply.Error(throwable) | |
if (!parent.children.contains(request.id)) { | |
idle(parent.copy(children = parent.children - request.id)) | |
} else idle(parent) | |
case other => | |
buffer.stash(other) | |
Behaviors.same | |
} | |
def active(parent: Parent = Parent()): Behavior[Command] = | |
Behaviors.receiveMessagePartial { | |
case request @ CreateChild(id, _) => | |
context.log.info(s"create child request for $id") | |
handleRequest(parent, request)(loadingChild(parent, request, _)) | |
case request @ DeleteChild(id, replyTo) => | |
context.log.info(s"delete child request for $id") | |
handleRequest(parent, request) { child => | |
context.askWithStatus(child, Child.Delete) { | |
case Success(child) => | |
ChildDeleted(child, replyTo) | |
case Failure(throwable) => | |
RequestFailed(throwable, request) | |
} | |
idle(parent, child) | |
} | |
case request @ Hello(id, replyTo) => | |
handleRequest(parent, request) { child => | |
context.askWithStatus(child, Child.Hello) { | |
case Success(value) => | |
HiReceived(value, replyTo) | |
case Failure(throwable) => | |
RequestFailed(throwable, request) | |
} | |
idle(parent, child) | |
} | |
case ChildTerminated(id) => | |
context.log.info(s"child with $id terminated") | |
idle(parent.copy(children = parent.children - id)) | |
} | |
def handleRequest[A](parent: Parent, request: Request[A])( | |
behavior: ActorRef[Child.Command] => Behavior[Command] | |
): Behavior[Command] = | |
parent.children | |
.get(request.id) | |
.fold(loadChild(parent, request))(behavior(_)) | |
private def idle(parent: Parent = Parent()): Behavior[Command] = { | |
context.log.info(s"current children: ${parent.children}") | |
buffer.unstashAll(active(parent)) | |
} | |
private def loadChild[A]( | |
parent: Parent, | |
request: Request[A] | |
): Behavior[Command] = { | |
context.log.info(s"creating child for ${request.id}") | |
val child = context.spawnAnonymous(Child.Actor(request.id, configuration)) | |
context.watchWith(child, ChildTerminated(request.id)) | |
loadingChild(parent, request, child) | |
} | |
private def loadingChild[A]( | |
parent: Parent, | |
request: Request[A], | |
actorRef: ActorRef[Child.Command] | |
): Behavior[Command] = { | |
context.askWithStatus(actorRef, Child.Get) { | |
case Success(child) => | |
ChildLoaded(child, request) | |
case Failure(throwable) => | |
RequestFailed(throwable, request) | |
} | |
idle(parent, actorRef) | |
} | |
} | |
object Actor extends ActorModule { | |
override type Message = Parent.Command | |
@Provides | |
def apply( | |
configuration: Configuration | |
)(implicit ec: ExecutionContext): Behavior[Command] = | |
Behaviors | |
.supervise[Command]( | |
Behaviors.withStash(configuration.get[Int](STASH_BUFFER_SIZE)) { | |
buffer => | |
Behaviors | |
.setup { context => | |
Behaviors.logMessages { | |
implicit val timeout: Timeout = | |
configuration.get[Int](TIMEOUT).seconds | |
initialize.pipe(context.pipeToSelf) { | |
case Success(value) => | |
Initialized(value) | |
case Failure(throwable) => | |
InitializationFailed(throwable) | |
} | |
new Actor(configuration, context, buffer).initializing() | |
} | |
} | |
} | |
) | |
.onFailure(SupervisorStrategy.restart) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package controllers | |
import akka.actor.typed.Scheduler | |
import com.google.inject.Singleton | |
import models.{Child, Hi} | |
import play.api.libs.json.Json | |
import play.api.mvc.{AbstractController, ControllerComponents} | |
import service.{Error, ParentChildService} | |
import javax.inject.Inject | |
import scala.concurrent.ExecutionContext | |
@Singleton class ParentChildController @Inject() ( | |
cc: ControllerComponents, | |
service: ParentChildService | |
)(implicit ec: ExecutionContext, scheduler: Scheduler) | |
extends AbstractController(cc) { | |
def create(id: String) = Action.async { request => | |
service | |
.createChild(id) | |
.map { case Child(id) => | |
Ok(Json.toJson(Json.obj("id" -> id))) | |
} | |
.recover { case e: Throwable => | |
e.printStackTrace() | |
InternalServerError(Json.toJson(Error(e.getMessage))) | |
} | |
} | |
def delete(id: String) = Action.async { request => | |
service | |
.deleteChild(id) | |
.map { case Child(id) => | |
Ok(Json.toJson(Json.obj("id" -> id))) | |
} | |
.recover { case e: Throwable => | |
e.printStackTrace() | |
InternalServerError(Json.toJson(Error(e.getMessage))) | |
} | |
} | |
def greet(id: String) = Action.async { request => | |
service | |
.greet(id) | |
.map { case Hi(message) => | |
Ok(Json.toJson(Json.obj("id" -> id, "message" -> message))) | |
} | |
.recover { case e: Throwable => | |
e.printStackTrace() | |
InternalServerError(Json.toJson(Error(e.getMessage))) | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ParentChildService @Inject() ( | |
configuration: Configuration, | |
parent: ActorRef[Parent.Command] | |
)(implicit scheduler: Scheduler, ex: ExecutionContext) { | |
implicit val timeout: Timeout = configuration.get[Int](TIMEOUT).seconds | |
implicit val duration: FiniteDuration = | |
configuration.get[Int](TIMEOUT).seconds | |
def greet(id: String): Future[Hi] = | |
parent.askWithStatus(Parent.Hello(id, _)) | |
def createChild(id: String): Future[Child] = | |
parent.askWithStatus(Parent.CreateChild(id, _)) | |
def deleteChild(id: String): Future[Child] = | |
parent.askWithStatus(Parent.DeleteChild(id, _)) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I should make all of the methods private