Created
February 14, 2021 21:14
-
-
Save rocel/158f9913ccc3e30833a1f85f24b5fbc9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package io.rocel.akka.demo | |
| import akka.actor.typed.receptionist.{Receptionist, ServiceKey} | |
| import akka.actor.typed.scaladsl.Behaviors | |
| import akka.actor.typed.{ActorRef, ActorSystem, Behavior} | |
| import akka.util.Timeout | |
| import com.typesafe.config.{Config, ConfigFactory} | |
| import scala.concurrent.duration.DurationInt | |
| object PingService { | |
| val key = ServiceKey[Ping]("ping-serviceKey") | |
| sealed trait Command | |
| final case class Ping(replyTo: ActorRef[Pong]) extends Command | |
| final case class Pong() extends Command | |
| def apply(id: String): Behavior[Command] = { | |
| Behaviors.setup { context => | |
| val serviceKey = s"pingService-$id" | |
| context.log.info("New PingService registering with ServiceKey {}", serviceKey) | |
| // Using a new instance of key doing `ServiceKey[Ping](serviceKey)` will not work | |
| context.system.receptionist ! Receptionist.Register(key, context.self) | |
| val config = getConfig(id) | |
| Behaviors.receiveMessage { | |
| case Ping(replyTo) => | |
| // Do something using `config` | |
| context.log.info("Pinged by {}", replyTo) | |
| replyTo ! Pong() | |
| Behaviors.same | |
| } | |
| } | |
| } | |
| def getConfig(id: String): Config = ConfigFactory.load().atKey(id) | |
| } | |
| object PingProxy { | |
| sealed trait Command | |
| final case class Ping(id: String) extends Command | |
| final case class Pong() extends Command | |
| private case class ListingResponse(listing: Receptionist.Listing) extends Command | |
| def apply(): Behavior[Command] = { | |
| Behaviors.setup[Command] { context => | |
| context.log.info("Initializing QueryManager ...") | |
| val listingResponseAdapter = context.messageAdapter[Receptionist.Listing](ListingResponse) | |
| Behaviors | |
| .receiveMessage { | |
| case Ping(id) => | |
| // Using a new instance of key doing `ServiceKey[Ping](s"pingService-$id")` will not work | |
| context.system.receptionist ! Receptionist.Find(PingService.key, listingResponseAdapter) | |
| Behaviors.same | |
| case Pong() => | |
| context.log.info("Received Pong for {}") | |
| Behaviors.same | |
| case ListingResponse(listing) => | |
| val key = listing.getKey | |
| context.log.info("Received listing for {}", key) | |
| if (listing.serviceInstances(key).isEmpty) { | |
| context.log.info("Spawning new PingService for key", key) | |
| val ref = context.spawnAnonymous(PingService(extractId(key.id))) | |
| //ref ! PingService.Ping(context.self) | |
| } else { | |
| context.log.info("Found existing instance of PingService with key", key) | |
| listing | |
| .serviceInstances(key) | |
| // .head ! PingService.Ping(context.self) | |
| } | |
| Behaviors.same | |
| } | |
| } | |
| } | |
| def extractId(key: String): String = key.split("-").last | |
| } | |
| object DemoReceptionist extends App { | |
| implicit val timeout: Timeout = 3.seconds | |
| val system = ActorSystem[PingProxy.Command](PingProxy(), "actor-system") | |
| system ! PingProxy.Ping("123") | |
| Thread.sleep(3_000) | |
| system ! PingProxy.Ping("123") | |
| Thread.sleep(1_000) | |
| system.terminate() | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment