Skip to content

Instantly share code, notes, and snippets.

@bhudgeons
Created May 8, 2014 22:09
Show Gist options
  • Save bhudgeons/78cc89fb6be47ff1c975 to your computer and use it in GitHub Desktop.
Save bhudgeons/78cc89fb6be47ff1c975 to your computer and use it in GitHub Desktop.
Demonstration of Akka Cluster Singleton Communication Problem
package sample.cluster.simple
import com.typesafe.config.ConfigFactory
import akka.actor.ActorSystem
import akka.actor.Props
import akka.actor._
import akka.contrib.pattern.ClusterSingletonManager
import akka.contrib.pattern.ClusterSingletonProxy
class MyClusterActor extends Actor {
def receive = {
case msg => {
println("MyClusterActor got *" + msg + "* from " + sender().path)
sender() ! "Here's a response from MyClusterActor"
}
}
}
case class Start(actor: ActorRef)
class MyRemoteActor extends Actor {
def receive = {
case Start(actor) => {
println("MyRemoteActor got Start with " + actor.path)
actor ! "Hello from RemoteActor"
}
case msg => {
println("MyRemoteActor got *" + msg + "* from " + sender().path)
}
}
}
object RemoteSystem {
val remoteConfig = ConfigFactory.parseString("akka.actor.provider=\"akka.remote.RemoteActorRefProvider\"").
withFallback(ConfigFactory.parseString("akka.remote.enabled-transports=[\"akka.remote.netty.tcp\"]")).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"127.0.0.1\"")).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=2551"))
val remoteSystem = ActorSystem("RemoteSystem", remoteConfig)
val remoteActor = remoteSystem.actorOf(Props[MyRemoteActor], "myremoteactor")
val clusterConfig = ConfigFactory.parseString("akka.actor.provider=\"akka.cluster.ClusterActorRefProvider\"").
withFallback(ConfigFactory.parseString("akka.remote.enabled-transports=[\"akka.remote.netty.tcp\"]")).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"127.0.0.1\"")).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=[\"akka.tcp://[email protected]:2553\"]")).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=2552"))
val clusterSystem = ActorSystem("ClusterSystem", clusterConfig)
val clusterActor = clusterSystem.actorOf(ClusterSingletonProxy.props(
singletonPath = "/user/myclusteractor/active",
role = None),
name = "MyClusterActor")
}
object ClusterSystem {
val clusterConfig = ConfigFactory.parseString("akka.actor.provider=\"akka.cluster.ClusterActorRefProvider\"").
withFallback(ConfigFactory.parseString("akka.remote.enabled-transports=[\"akka.remote.netty.tcp\"]")).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.hostname=\"127.0.0.1\"")).
withFallback(ConfigFactory.parseString("akka.cluster.seed-nodes=[\"akka.tcp://[email protected]:2553\"]")).
withFallback(ConfigFactory.parseString("akka.remote.netty.tcp.port=2553"))
val clusterSystem = ActorSystem("ClusterSystem", clusterConfig)
clusterSystem.actorOf(ClusterSingletonManager.props(Props(classOf[MyClusterActor]), "active",
PoisonPill, None), "myclusteractor")
def setup() = println("Setting up ClusterSystem")
}
@bhudgeons
Copy link
Author

For convenience, you can drop this file into akka-samples/akka-sample-cluster-scala/src/main/scala/sample/cluster/simple/ and do this:

On VM #1:

$ sbt
[info] Loading project definition from /Users/brandon/Documents/workspace/akka/project
[info] Set current project to akka (in build file:/Users/brandon/Documents/workspace/akka/)
akka > akka-sample-cluster-scala/console
[info] Starting scala interpreter...
[info]
import language.postfixOps
import akka.actor._
import ActorDSL._
import scala.concurrent._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import akka.util.Timeout
config: com.typesafe.config.Config = Config(SimpleConfigObject({"pinned":{"executor":"thread-pool-executor","type":"PinnedDispatcher","throughput":1000},"akka":{"loglevel":"DEBUG","stdout-loglevel":"INFO"}}))
remoteConfig: com.typesafe.config.Config = Config(SimpleConfigObject({"pinned":{"executor":"thread-pool-executor","type":"PinnedDispatcher","throughput":1000},"akka":{"loglevel":"DEBUG","stdout-loglevel":"INFO","actor":{"provider":"akka.remote.RemoteActorRefProvider"},"remote":{"netty":{"execution-pool-size":0,"port":0,"use-dispatcher-for-io":"akka.actor.default-dispatcher"}}}}))
system: akka....Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_04).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import sample.cluster.simple._
import sample.cluster.simple._

scala> ClusterSystem.setup()
[INFO] [05/08/2014 17:02:29.155] [run-main-0] [Remoting] Starting remoting
[INFO] [05/08/2014 17:02:29.320] [run-main-0] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2553]
[INFO] [05/08/2014 17:02:29.322] [run-main-0] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2553]
[INFO] [05/08/2014 17:02:29.334] [run-main-0] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2553] - Starting up...
[INFO] [05/08/2014 17:02:29.392] [run-main-0] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2553] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/08/2014 17:02:29.392] [run-main-0] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2553] - Started up successfully
[INFO] [05/08/2014 17:02:29.422] [ClusterSystem-akka.actor.default-dispatcher-14] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2553] - Node [akka.tcp://[email protected]:2553] is JOINING, roles []
Setting up ClusterSystem

[Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2553] - Metrics collection has started successfully
[INFO] [05/08/2014 17:02:30.428] [ClusterSystem-akka.actor.default-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2553] - Leader is moving node [akka.tcp://[email protected]:2553] to [Up]
[INFO] [05/08/2014 17:02:30.441] [ClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://[email protected]:2553/user/myclusteractor] Singleton manager [akka.tcp://[email protected]:2553] starting singleton actor
[INFO] [05/08/2014 17:02:30.443] [ClusterSystem-akka.actor.default-dispatcher-20] [akka.tcp://[email protected]:2553/user/myclusteractor] ClusterSingletonManager state change [Start -> Oldest]

So, now the cluster is up and running on port 2553, and the singleton (myclusteractor) is running on it.

Now, on VM #2:

$ sbt
[warn] The global sbt directory is now versioned and is located at /Users/brandon/.sbt/0.13.
[warn]   You are seeing this warning because there is global configuration in /Users/brandon/.sbt but not in /Users/brandon/.sbt/0.13.
[warn]   The global sbt directory may be changed via the sbt.global.base system property.
[info] Loading project definition from /Users/brandon/Documents/workspace/akka/project
[info] Set current project to akka (in build file:/Users/brandon/Documents/workspace/akka/)
akka > akka-sample-cluster-scala/console
[info] Starting scala interpreter...
[info]
import language.postfixOps
import akka.actor._
import ActorDSL._
import scala.concurrent._
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import akka.util.Timeout
config: com.typesafe.config.Config = Config(SimpleConfigObject({"pinned":{"executor":"thread-pool-executor","type":"PinnedDispatcher","throughput":1000},"akka":{"loglevel":"DEBUG","stdout-loglevel":"INFO"}}))
remoteConfig: com.typesafe.config.Config = Config(SimpleConfigObject({"pinned":{"executor":"thread-pool-executor","type":"PinnedDispatcher","throughput":1000},"akka":{"loglevel":"DEBUG","stdout-loglevel":"INFO","actor":{"provider":"akka.remote.RemoteActorRefProvider"},"remote":{"netty":{"execution-pool-size":0,"port":0,"use-dispatcher-for-io":"akka.actor.default-dispatcher"}}}}))
system: akka....Welcome to Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_04).
Type in expressions to have them evaluated.
Type :help for more information.

scala> import sample.cluster.simple._
import sample.cluster.simple._

scala> RemoteSystem.remoteActor ! "sending messages to this system works"
[INFO] [05/08/2014 17:03:41.582] [run-main-0] [Remoting] Starting remoting
[INFO] [05/08/2014 17:03:41.743] [run-main-0] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2551]
[INFO] [05/08/2014 17:03:41.746] [run-main-0] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2551]
[INFO] [05/08/2014 17:03:41.779] [run-main-0] [Remoting] Starting remoting
[INFO] [05/08/2014 17:03:41.789] [run-main-0] [Remoting] Remoting started; listening on addresses :[akka.tcp://[email protected]:2552]
[INFO] [05/08/2014 17:03:41.789] [run-main-0] [Remoting] Remoting now listens on addresses: [akka.tcp://[email protected]:2552]
[INFO] [05/08/2014 17:03:41.799] [run-main-0] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2552] - Starting up...
[INFO] [05/08/2014 17:03:41.857] [run-main-0] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2552] - Registered cluster JMX MBean [akka:type=Cluster]
[INFO] [05/08/2014 17:03:41.857] [run-main-0] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2552] - Started up successfully

MyRemoteActor got *sending messages to this system works* from akka://RemoteSystem/deadLetters

[INFO] [05/08/2014 17:03:41.977] [ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2552] - Metrics collection has started successfully
[INFO] [05/08/2014 17:03:42.201] [ClusterSystem-akka.actor.default-dispatcher-2] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2552] - Welcome from [akka.tcp://[email protected]:2553]
[INFO] [05/08/2014 17:03:42.300] [ClusterSystem-akka.actor.default-dispatcher-3] [akka.tcp://[email protected]:2552/user/MyClusterActor] Singleton identified: akka.tcp://[email protected]:2553/user/myclusteractor/active

Now, the cluster is set up on VM #2 on port 2552. It found the seed on 2553 and joined the cluster and found the singleton. VM #2 also has a separate system set up for remoting on port 2551.

Back on VM #1, we see it acknowledge the new node:

[INFO] [05/08/2014 17:03:42.119] [ClusterSystem-akka.actor.default-dispatcher-14] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2553] - Node [akka.tcp://[email protected]:2552] is JOINING, roles []
[INFO] [05/08/2014 17:03:42.409] [ClusterSystem-akka.actor.default-dispatcher-4] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://[email protected]:2553] - Leader is moving node [akka.tcp://[email protected]:2552] to [Up]

Now, back on VM #2, we send a message to the singleton:

scala> RemoteSystem.clusterActor ! "I can reach the cluster singleton in the other VM"

back on VM #1, we see that the cluster actor receives the message. (We don't get a response, because we didn't send the message from an actor that can receive messages.)

MyClusterActor got *I can reach the cluster singleton in the other VM* from akka.tcp://[email protected]:2552/deadLetters

Now, back on VM #2, we send a Start message to the remoteActor, passing it the proxy for the cluster singleton. The remoteActor will then send a message to the singleton, and should receive a message back when the singleton responds to sender():

scala> RemoteSystem.remoteActor ! Start(RemoteSystem.clusterActor)

MyRemoteActor got Start with akka://ClusterSystem/user/MyClusterActor

On VM #1, we see that the singleton gets the message from remoteActor:

MyClusterActor got *Hello from RemoteActor* from akka.tcp://[email protected]:2552/user/myremoteactor

But, note that the address for the sender() is not correct. It should be RemoteSystem, not ClusterSystem.

In VM #2, we see that the attempt to respond back to remoteActor indeed fails:

[INFO] [05/08/2014 17:06:36.792] [ClusterSystem-akka.actor.default-dispatcher-16] [akka://ClusterSystem/user/myremoteactor] Message [java.lang.String] from Actor[akka.tcp://[email protected]:2553/user/myclusteractor/active#1651707184] to Actor[akka://ClusterSystem/user/myremoteactor#-95621122] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

@patriknw
Copy link

patriknw commented May 9, 2014

tip: scala supports:

parseString("""akka.remote.enabled-transports="akka.remote.netty.tcp"
                     akka.remote.netty.tcp.hostname="127.0.0.1"
                 """)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment