-
-
Save phoenix24/6097895 to your computer and use it in GitHub Desktop.
package spikes.cluster | |
import akka.actor._ | |
import com.typesafe.config.ConfigFactory | |
import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension} | |
object DemoMaster { | |
def main(args: Array[String]): Unit = { | |
val config = ConfigFactory.parseString(""" | |
akka { | |
actor { | |
provider = "akka.cluster.ClusterActorRefProvider" | |
} | |
remote { | |
transport = "akka.remote.netty.NettyRemoteTransport" | |
log-remote-lifecycle-events = off | |
netty.tcp { | |
hostname = "127.0.0.1" | |
port = 2551 | |
} | |
} | |
cluster { | |
seed-nodes = [ | |
"akka.tcp://[email protected]:2551" | |
] | |
roles = [master] | |
auto-down = on | |
} | |
}""") | |
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config)) | |
val master = system.actorOf(Props[ClusterMaster], "master") | |
ClusterReceptionistExtension(system).registerService(master) | |
} | |
class ClusterMaster extends Actor with ActorLogging { | |
def receive = { | |
case e => | |
log.info(s"from master : $e : $sender") | |
sender ! "master : how are you?" | |
} | |
} | |
} | |
object DemoMember { | |
def main(args: Array[String]) { | |
val config = ConfigFactory.parseString(""" | |
akka { | |
actor { | |
provider = "akka.cluster.ClusterActorRefProvider" | |
} | |
remote { | |
transport = "akka.remote.netty.NettyRemoteTransport" | |
log-remote-lifecycle-events = off | |
netty.tcp { | |
hostname = "127.0.0.1" | |
port = 3000 | |
} | |
} | |
cluster { | |
seed-nodes = [ | |
"akka.tcp://[email protected]:2551" | |
] | |
auto-down = on | |
} | |
}""") | |
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config)) | |
val clusterMember = system.actorOf(Props[ClusterMember], "member") | |
ClusterReceptionistExtension(system).registerService(clusterMember) | |
} | |
class ClusterMember extends Actor with ActorLogging { | |
def receive = { | |
case e => | |
log.info(s"from member : $e : $sender") | |
sender ! "member : how are you?" | |
} | |
} | |
} | |
object DemoClient { | |
def main(args : Array[String]) { | |
val config = ConfigFactory.parseString(""" | |
akka { | |
actor { | |
provider = "akka.remote.RemoteActorRefProvider" | |
} | |
remote { | |
transport = "akka.remote.netty.NettyRemoteTransport" | |
log-remote-lifecycle-events = off | |
netty.tcp { | |
hostname = "127.0.0.1" | |
port = 5000 | |
} | |
} | |
}""") | |
val system = ActorSystem("OTHERSYSTEM", ConfigFactory.load(config)) | |
val initialContacts = Set( | |
system.actorSelection("akka.tcp://[email protected]:2551/user/receptionist"), | |
system.actorSelection("akka.tcp://[email protected]:3000/user/receptionist")) | |
val c = system.actorOf(ClusterClient.props(initialContacts), "os-client") | |
(1 to 1000).map { i => | |
c ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true) | |
c ! ClusterClient.Send("/user/member", s"hello - $i", localAffinity = true) | |
Thread.sleep(1000) | |
} | |
} | |
} |
Excellent work... I was looking for this code. Thanks
using Akka 2.4.x It would require following changes:
-
import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension} to import akka.cluster.client.{ClusterClient,ClusterClientReceptionist}
-
ClusterReceptionistExtension(system).registerService(master) to ClusterClientReceptionist(system).registerService(master)
-
val initialContacts = Set(
system.actorSelection("akka.tcp://[email protected]:2551/user/receptionist"),
system.actorSelection("akka.tcp://[email protected]:3000/user/receptionist")) toval initialContacts = Set(
system.actorSelection("akka.tcp://[email protected]:2551/system/receptionist"),
system.actorSelection("akka.tcp://[email protected]:3000/system/receptionist"))
To avoid dead-letters right after the receptionist cluster is up, add akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"]
to the application.conf
I needed to update this code with latest Akka version. Also, I added code so ClusterClient sends it's own actorRef in order to get a reply back directly, and took out DemoMember. The code below is tested with sbt.version = 0.13.15.
build.sbt
name := "ClusterClient"
version := "0.1"
scalaVersion := "2.12.4"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.14"
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.14"
import akka.actor.{Actor, ActorLogging, ActorPath, ActorSystem, Props}
import akka.cluster.client.{ClusterClient, ClusterClientSettings}
import com.typesafe.config.ConfigFactory
object DemoClient {
def main(args : Array[String]) {
val config = ConfigFactory.parseString("""
akka {
log-dead-letters = OFF
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "localhost"
port = 5000
}
}
}""")
val system = ActorSystem("OUTSIDER-SYSTEM", ConfigFactory.load(config))
val initialContacts = Set(
ActorPath.fromString("akka.tcp://ClusterSystem@localhost:2551/system/receptionist"))
val cc = system.actorOf(ClusterClient.props(
ClusterClientSettings(system).withInitialContacts(initialContacts)), "os-client")
val ccActor = system.actorOf(Props[ClusterClientActor], "ccActor")
cc ! ClusterClient.Send("/user/master", ccActor, localAffinity = true)
(1 to 10).map { i =>
cc ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true)
Thread.sleep(10000)
}
}
class ClusterClientActor extends Actor with ActorLogging {
def receive = {
case e =>
log.info(s"from cluster-client : $e : $sender")
}
}
}
import akka.actor.{ActorRef, _}
import akka.cluster.client.ClusterClientReceptionist
import com.typesafe.config.ConfigFactory
object DemoMaster {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseString("""
akka {
log-dead-letters = OFF
extensions = ["akka.cluster.client.ClusterClientReceptionist"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "localhost"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@localhost:2551"
]
roles = [master]
auto-down = on
}
}""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
val master = system.actorOf(Props[ClusterMaster], "master")
ClusterClientReceptionist(system).registerService(master)
}
class ClusterMaster extends Actor with ActorLogging {
var senderActor : ActorRef = null;
def receive = {
case a: ActorRef =>
log.info(s"from master : $a : $sender")
senderActor = a
case e =>
log.info(s"from master : $e : $senderActor")
if(senderActor != null) senderActor ! "master : how are you?"
}
}
}
Thank you very much. This saved me a lot of work!