Skip to content

Instantly share code, notes, and snippets.

@rocel
Created February 14, 2021 21:14
Show Gist options
  • Select an option

  • Save rocel/158f9913ccc3e30833a1f85f24b5fbc9 to your computer and use it in GitHub Desktop.

Select an option

Save rocel/158f9913ccc3e30833a1f85f24b5fbc9 to your computer and use it in GitHub Desktop.
package io.rocel.akka.demo
import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.util.Timeout
import com.typesafe.config.{Config, ConfigFactory}
import scala.concurrent.duration.DurationInt
object PingService {
val key = ServiceKey[Ping]("ping-serviceKey")
sealed trait Command
final case class Ping(replyTo: ActorRef[Pong]) extends Command
final case class Pong() extends Command
def apply(id: String): Behavior[Command] = {
Behaviors.setup { context =>
val serviceKey = s"pingService-$id"
context.log.info("New PingService registering with ServiceKey {}", serviceKey)
// Using a new instance of key doing `ServiceKey[Ping](serviceKey)` will not work
context.system.receptionist ! Receptionist.Register(key, context.self)
val config = getConfig(id)
Behaviors.receiveMessage {
case Ping(replyTo) =>
// Do something using `config`
context.log.info("Pinged by {}", replyTo)
replyTo ! Pong()
Behaviors.same
}
}
}
def getConfig(id: String): Config = ConfigFactory.load().atKey(id)
}
object PingProxy {
sealed trait Command
final case class Ping(id: String) extends Command
final case class Pong() extends Command
private case class ListingResponse(listing: Receptionist.Listing) extends Command
def apply(): Behavior[Command] = {
Behaviors.setup[Command] { context =>
context.log.info("Initializing QueryManager ...")
val listingResponseAdapter = context.messageAdapter[Receptionist.Listing](ListingResponse)
Behaviors
.receiveMessage {
case Ping(id) =>
// Using a new instance of key doing `ServiceKey[Ping](s"pingService-$id")` will not work
context.system.receptionist ! Receptionist.Find(PingService.key, listingResponseAdapter)
Behaviors.same
case Pong() =>
context.log.info("Received Pong for {}")
Behaviors.same
case ListingResponse(listing) =>
val key = listing.getKey
context.log.info("Received listing for {}", key)
if (listing.serviceInstances(key).isEmpty) {
context.log.info("Spawning new PingService for key", key)
val ref = context.spawnAnonymous(PingService(extractId(key.id)))
//ref ! PingService.Ping(context.self)
} else {
context.log.info("Found existing instance of PingService with key", key)
listing
.serviceInstances(key)
// .head ! PingService.Ping(context.self)
}
Behaviors.same
}
}
}
def extractId(key: String): String = key.split("-").last
}
object DemoReceptionist extends App {
implicit val timeout: Timeout = 3.seconds
val system = ActorSystem[PingProxy.Command](PingProxy(), "actor-system")
system ! PingProxy.Ping("123")
Thread.sleep(3_000)
system ! PingProxy.Ping("123")
Thread.sleep(1_000)
system.terminate()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment