-
-
Save phoenix24/6097895 to your computer and use it in GitHub Desktop.
package spikes.cluster | |
import akka.actor._ | |
import com.typesafe.config.ConfigFactory | |
import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension} | |
object DemoMaster { | |
def main(args: Array[String]): Unit = { | |
val config = ConfigFactory.parseString(""" | |
akka { | |
actor { | |
provider = "akka.cluster.ClusterActorRefProvider" | |
} | |
remote { | |
transport = "akka.remote.netty.NettyRemoteTransport" | |
log-remote-lifecycle-events = off | |
netty.tcp { | |
hostname = "127.0.0.1" | |
port = 2551 | |
} | |
} | |
cluster { | |
seed-nodes = [ | |
"akka.tcp://[email protected]:2551" | |
] | |
roles = [master] | |
auto-down = on | |
} | |
}""") | |
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config)) | |
val master = system.actorOf(Props[ClusterMaster], "master") | |
ClusterReceptionistExtension(system).registerService(master) | |
} | |
class ClusterMaster extends Actor with ActorLogging { | |
def receive = { | |
case e => | |
log.info(s"from master : $e : $sender") | |
sender ! "master : how are you?" | |
} | |
} | |
} | |
object DemoMember { | |
def main(args: Array[String]) { | |
val config = ConfigFactory.parseString(""" | |
akka { | |
actor { | |
provider = "akka.cluster.ClusterActorRefProvider" | |
} | |
remote { | |
transport = "akka.remote.netty.NettyRemoteTransport" | |
log-remote-lifecycle-events = off | |
netty.tcp { | |
hostname = "127.0.0.1" | |
port = 3000 | |
} | |
} | |
cluster { | |
seed-nodes = [ | |
"akka.tcp://[email protected]:2551" | |
] | |
auto-down = on | |
} | |
}""") | |
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config)) | |
val clusterMember = system.actorOf(Props[ClusterMember], "member") | |
ClusterReceptionistExtension(system).registerService(clusterMember) | |
} | |
class ClusterMember extends Actor with ActorLogging { | |
def receive = { | |
case e => | |
log.info(s"from member : $e : $sender") | |
sender ! "member : how are you?" | |
} | |
} | |
} | |
object DemoClient { | |
def main(args : Array[String]) { | |
val config = ConfigFactory.parseString(""" | |
akka { | |
actor { | |
provider = "akka.remote.RemoteActorRefProvider" | |
} | |
remote { | |
transport = "akka.remote.netty.NettyRemoteTransport" | |
log-remote-lifecycle-events = off | |
netty.tcp { | |
hostname = "127.0.0.1" | |
port = 5000 | |
} | |
} | |
}""") | |
val system = ActorSystem("OTHERSYSTEM", ConfigFactory.load(config)) | |
val initialContacts = Set( | |
system.actorSelection("akka.tcp://[email protected]:2551/user/receptionist"), | |
system.actorSelection("akka.tcp://[email protected]:3000/user/receptionist")) | |
val c = system.actorOf(ClusterClient.props(initialContacts), "os-client") | |
(1 to 1000).map { i => | |
c ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true) | |
c ! ClusterClient.Send("/user/member", s"hello - $i", localAffinity = true) | |
Thread.sleep(1000) | |
} | |
} | |
} |
Thank you very much. This saved me a lot of work!
Excellent work... I was looking for this code. Thanks
using Akka 2.4.x It would require following changes:
-
import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension} to import akka.cluster.client.{ClusterClient,ClusterClientReceptionist}
-
ClusterReceptionistExtension(system).registerService(master) to ClusterClientReceptionist(system).registerService(master)
-
val initialContacts = Set(
system.actorSelection("akka.tcp://[email protected]:2551/user/receptionist"),
system.actorSelection("akka.tcp://[email protected]:3000/user/receptionist")) toval initialContacts = Set(
system.actorSelection("akka.tcp://[email protected]:2551/system/receptionist"),
system.actorSelection("akka.tcp://[email protected]:3000/system/receptionist"))
To avoid dead-letters right after the receptionist cluster is up, add akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
to the application.conf
I needed to update this code with latest Akka version. Also, I added code so ClusterClient sends it's own actorRef in order to get a reply back directly, and took out DemoMember. The code below is tested with sbt.version = 0.13.15.
build.sbt
name := "ClusterClient"
version := "0.1"
scalaVersion := "2.12.4"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.14"
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.14"
import akka.actor.{Actor, ActorLogging, ActorPath, ActorSystem, Props}
import akka.cluster.client.{ClusterClient, ClusterClientSettings}
import com.typesafe.config.ConfigFactory
object DemoClient {
def main(args : Array[String]) {
val config = ConfigFactory.parseString("""
akka {
log-dead-letters = OFF
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "localhost"
port = 5000
}
}
}""")
val system = ActorSystem("OUTSIDER-SYSTEM", ConfigFactory.load(config))
val initialContacts = Set(
ActorPath.fromString("akka.tcp://ClusterSystem@localhost:2551/system/receptionist"))
val cc = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)), "os-client")
val ccActor = system.actorOf(Props[ClusterClientActor], "ccActor")
cc ! ClusterClient.Send("/user/master", ccActor, localAffinity = true)
(1 to 10).map { i =>
cc ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true)
Thread.sleep(10000)
}
}
class ClusterClientActor extends Actor with ActorLogging {
def receive = {
case e =>
log.info(s"from cluster-client : $e : $sender")
}
}
}
import akka.actor.{ActorRef, _}
import akka.cluster.client.ClusterClientReceptionist
import com.typesafe.config.ConfigFactory
object DemoMaster {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseString("""
akka {
log-dead-letters = OFF
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "localhost"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@localhost:2551"
]
roles = [master]
auto-down = on
}
}""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
val master = system.actorOf(Props[ClusterMaster], "master")
ClusterClientReceptionist(system).registerService(master)
}
class ClusterMaster extends Actor with ActorLogging {
var senderActor : ActorRef = null;
def receive = {
case a: ActorRef =>
log.info(s"from master : $a : $sender")
senderActor = a
case e =>
log.info(s"from master : $e : $senderActor")
if(senderActor != null) senderActor ! "master : how are you?"
}
}
}
Replies can be received by an actor, so you have to create an actor on the client first. Then it would be most convenient to move the sending of the messages into that actor which will automatically set the right sender reference for the
!
method (lines 117/118). Handle messages of type String and you will see your replies.