Skip to content

Instantly share code, notes, and snippets.

@DBassel
Last active August 29, 2015 14:22
Show Gist options
  • Save DBassel/3f09c278cdf717482176 to your computer and use it in GitHub Desktop.
Save DBassel/3f09c278cdf717482176 to your computer and use it in GitHub Desktop.
Akka DistributedPubSubExtension multi-node cluster sample
package test.akka
import akka.actor.Actor.Receive
import akka.actor.{Props, ActorLogging, Actor, ActorSystem}
import akka.cluster.Cluster
import akka.contrib.pattern.DistributedPubSubExtension
import akka.contrib.pattern.DistributedPubSubMediator.{SubscribeAck, Subscribe, Publish}
import com.typesafe.config.ConfigFactory
/**
* Created by
* @author Basel Darvish
*/
object AkkaDistributedPubSub extends App{
val systemC1 = ActorSystem("system1", ConfigFactory.load("c1"))
val systemC2 = ActorSystem("system1", ConfigFactory.load("c2"))
Thread.sleep(10000)//waiting for cluster to initiate
systemC1.actorOf(Props(classOf[Subscriber]))
Thread.sleep(3000)//waiiting for subscriber to initiate
val publisher = systemC2.actorOf(Props(classOf[Publisher]))
publisher ! "Hey!"
}
class Subscriber extends Actor with ActorLogging {
val mediator = DistributedPubSubExtension(context.system).mediator
mediator ! Subscribe("t", self)
log info("subscribed to: {}", "t")
def receive = {
case SubscribeAck(Subscribe("t", _, `self`)) ⇒
context become ready
}
def ready: Receive = {
case s:String =>
log info("++++++++++++++++ Received message: {}", s)
case a:Any =>
log info("Received unknown message: {}", a)
}
}
class Publisher extends Actor with ActorLogging {
val mediator = DistributedPubSubExtension(context.system).mediator
override def receive: Receive = {
case s:String =>
log info("________________Publishing message: {}", s)
mediator ! Publish("t", s)
}
}
akka{
extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 2552
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2552"
"akka.tcp://[email protected]:2551"]
auto-down = on
auto-down-unreachable-after = 5s
}
}
akka{
extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 2551
}
}
cluster {
seed-nodes = [
"akka.tcp://[email protected]:2552",
"akka.tcp://[email protected]:2551"]
auto-down = on
auto-down-unreachable-after = 5s
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment