Skip to content

Instantly share code, notes, and snippets.

@phoenix24
Last active May 19, 2022 18:12
Show Gist options
  • Save phoenix24/6097895 to your computer and use it in GitHub Desktop.
Save phoenix24/6097895 to your computer and use it in GitHub Desktop.
sample akka cluster client.
package spikes.cluster
import akka.actor._
import com.typesafe.config.ConfigFactory
import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension}
object DemoMaster {
def main(args: Array[String]): Unit = {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551"
]
roles = [master]
auto-down = on
}
}""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
val master = system.actorOf(Props[ClusterMaster], "master")
ClusterReceptionistExtension(system).registerService(master)
}
class ClusterMaster extends Actor with ActorLogging {
def receive = {
case e =>
log.info(s"from master : $e : $sender")
sender ! "master : how are you?"
}
}
}
object DemoMember {
def main(args: Array[String]) {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 3000
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2551"
]
auto-down = on
}
}""")
val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))
val clusterMember = system.actorOf(Props[ClusterMember], "member")
ClusterReceptionistExtension(system).registerService(clusterMember)
}
class ClusterMember extends Actor with ActorLogging {
def receive = {
case e =>
log.info(s"from member : $e : $sender")
sender ! "member : how are you?"
}
}
}
object DemoClient {
def main(args : Array[String]) {
val config = ConfigFactory.parseString("""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteTransport"
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
port = 5000
}
}
}""")
val system = ActorSystem("OTHERSYSTEM", ConfigFactory.load(config))
val initialContacts = Set(
system.actorSelection("akka.tcp://[email protected]:2551/user/receptionist"),
system.actorSelection("akka.tcp://[email protected]:3000/user/receptionist"))
val c = system.actorOf(ClusterClient.props(initialContacts), "os-client")
(1 to 1000).map { i =>
c ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true)
c ! ClusterClient.Send("/user/member", s"hello - $i", localAffinity = true)
Thread.sleep(1000)
}
}
}
@rkuhn
Copy link

rkuhn commented Jul 28, 2013

Replies can be received by an actor, so you have to create an actor on the client first. Then it would be most convenient to move the sending of the messages into that actor which will automatically set the right sender reference for the ! method (lines 117/118). Handle messages of type String and you will see your replies.

@Yord
Copy link

Yord commented Aug 7, 2013

Thank you very much. This saved me a lot of work!

@asethia
Copy link

asethia commented Jan 4, 2016

Excellent work... I was looking for this code. Thanks

using Akka 2.4.x It would require following changes:

  1. import akka.contrib.pattern.{ClusterClient, ClusterReceptionistExtension} to import akka.cluster.client.{ClusterClient,ClusterClientReceptionist}

  2. ClusterReceptionistExtension(system).registerService(master) to ClusterClientReceptionist(system).registerService(master)

  3. val initialContacts = Set(
    system.actorSelection("akka.tcp://[email protected]:2551/user/receptionist"),
    system.actorSelection("akka.tcp://[email protected]:3000/user/receptionist")) to

    val initialContacts = Set(
    system.actorSelection("akka.tcp://[email protected]:2551/system/receptionist"),
    system.actorSelection("akka.tcp://[email protected]:3000/system/receptionist"))

@aubreylouw
Copy link

To avoid dead-letters right after the receptionist cluster is up, add akka.extensions = ["akka.cluster.client.ClusterClientReceptionist"] to the application.conf

@atatous
Copy link

atatous commented Aug 14, 2018

I needed to update this code with latest Akka version. Also, I added code so ClusterClient sends it's own actorRef in order to get a reply back directly, and took out DemoMember. The code below is tested with sbt.version = 0.13.15.

build.sbt
name := "ClusterClient"
version := "0.1"
scalaVersion := "2.12.4"
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.5.14"
libraryDependencies += "com.typesafe.akka" %% "akka-cluster-tools" % "2.5.14"

import akka.actor.{Actor, ActorLogging, ActorPath, ActorSystem, Props}
import akka.cluster.client.{ClusterClient, ClusterClientSettings}
import com.typesafe.config.ConfigFactory

object DemoClient {

  def main(args : Array[String]) {
    val config = ConfigFactory.parseString("""
     akka {
       log-dead-letters = OFF
       actor {
         provider = "akka.remote.RemoteActorRefProvider"
       }

       remote {
         transport = "akka.remote.netty.NettyRemoteTransport"
         log-remote-lifecycle-events = off
         netty.tcp {
          hostname = "localhost"
          port = 5000
         }
       }
     }""")

    val system = ActorSystem("OUTSIDER-SYSTEM", ConfigFactory.load(config))

    val initialContacts = Set(
      ActorPath.fromString("akka.tcp://ClusterSystem@localhost:2551/system/receptionist"))


    val cc = system.actorOf(ClusterClient.props(
      ClusterClientSettings(system).withInitialContacts(initialContacts)), "os-client")

    val ccActor = system.actorOf(Props[ClusterClientActor], "ccActor")

    cc ! ClusterClient.Send("/user/master", ccActor, localAffinity = true)

    (1 to 10).map { i =>
      cc ! ClusterClient.Send("/user/master", s"hello - $i", localAffinity = true)
      Thread.sleep(10000)
    }
  }

  class ClusterClientActor extends Actor with ActorLogging {
    def receive = {
      case e =>
        log.info(s"from cluster-client : $e : $sender")
    }
  }
}


import akka.actor.{ActorRef, _}
import akka.cluster.client.ClusterClientReceptionist
import com.typesafe.config.ConfigFactory

object DemoMaster {

  def main(args: Array[String]): Unit = {
    
    val config = ConfigFactory.parseString("""
     akka {
       log-dead-letters = OFF
       extensions = ["akka.cluster.client.ClusterClientReceptionist"]
       actor {
         provider = "akka.cluster.ClusterActorRefProvider"
       }
       remote {
         transport = "akka.remote.netty.NettyRemoteTransport"
         log-remote-lifecycle-events = off
         netty.tcp {
           hostname = "localhost"
           port = 2551
         }
       }
       cluster {
         seed-nodes = [
           "akka.tcp://ClusterSystem@localhost:2551"
           ]
         roles = [master]
         auto-down = on
       }
     }""")

    val system = ActorSystem("ClusterSystem", ConfigFactory.load(config))

    val master = system.actorOf(Props[ClusterMaster], "master")
    ClusterClientReceptionist(system).registerService(master)
  }

  class ClusterMaster extends Actor with ActorLogging {
    var senderActor : ActorRef = null;

    def receive = {
      case a: ActorRef =>
        log.info(s"from master : $a : $sender")
        senderActor = a
      case e =>
        log.info(s"from master : $e : $senderActor")
        if(senderActor != null) senderActor ! "master : how are you?"
    }
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment