Skip to content

Instantly share code, notes, and snippets.

@lucasrpb
Created March 6, 2021 01:03
Show Gist options
  • Save lucasrpb/1143a365545de2097513025f4090cd8e to your computer and use it in GitHub Desktop.
Save lucasrpb/1143a365545de2097513025f4090cd8e to your computer and use it in GitHub Desktop.
How to init and stop akka akka typed sharded singletons around the cluster
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorSystem, Behavior}
import akka.cluster.sharding.typed.scaladsl.{EntityTypeKey, ShardedDaemonProcess}
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
object Main {
val TypeKey = EntityTypeKey[Greeter.Command]("Greeter")
implicit val timeout = Timeout(5 seconds)
object RootBehavior {
def apply(port: Int): Behavior[Nothing] = Behaviors.setup[Nothing] { context =>
// Create an actor that handles cluster domain events
//context.spawn(ClusterListener(), "ClusterListener")
val workers = Seq("w0", "w1", "w2")
ShardedDaemonProcess(context.system)
.init("workers", workers.length, id => Worker(workers(id)), Worker.StopWorker)
Behaviors.empty
}
}
def main(args: Array[String]): Unit = {
val ports =
if (args.isEmpty)
Seq(2551, 2552)
else
args.toSeq.map(_.toInt)
ports.foreach(startup)
}
def startup(port: Int): Unit = {
// Override the configuration of the port
val config = ConfigFactory.parseString(s"""
akka.remote.artery.canonical.port=$port
""").withFallback(ConfigFactory.load())
// Create an Akka system
ActorSystem[Nothing](RootBehavior(port), "PubSub", config)
}
}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ActorRef, Behavior}
import java.util.concurrent.TimeUnit
import java.util.{Timer, TimerTask}
object Worker {
sealed trait Command
final case object StopWorker extends Command
def apply(tag: String): Behavior[Command] = Behaviors.setup { ctx =>
// start the processing ...
ctx.log.info(s"${Console.GREEN_B}Starting worker ${tag}${Console.RESET}\n")
//println((s"${Console.GREEN_B}Starting worker {}${Console.RESET}\n", tag))
val timer = new Timer()
timer.scheduleAtFixedRate(new TimerTask {
override def run(): Unit = {
println((s"${Console.MAGENTA_B}tick for ${tag}${Console.RESET}\n"))
}
}, 0L, 1000L)
Behaviors.receiveMessage {
case StopWorker =>
timer.cancel()
Behaviors.stopped
case _ => Behaviors.empty
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment