-
-
Save pisfly/53d207b98eb5dcd3dfa7628d144678ed to your computer and use it in GitHub Desktop.
Example of how to implement a simple cluster wide actor registry.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com> | |
*/ | |
package akka.contrib.pattern | |
import language.postfixOps | |
import scala.concurrent.duration._ | |
import com.typesafe.config.ConfigFactory | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import akka.actor.Address | |
import akka.actor.Props | |
import akka.actor.PoisonPill | |
import akka.actor.Terminated | |
import akka.cluster.Cluster | |
import akka.cluster.ClusterEvent._ | |
import akka.cluster.Member | |
import akka.cluster.MemberStatus | |
import akka.remote.testconductor.RoleName | |
import akka.remote.testkit.MultiNodeConfig | |
import akka.remote.testkit.MultiNodeSpec | |
import akka.remote.testkit.STMultiNodeSpec | |
import akka.testkit._ | |
object ClusterRegistrySpec extends MultiNodeConfig { | |
val first = role("first") | |
val second = role("second") | |
val third = role("third") | |
commonConfig(ConfigFactory.parseString(""" | |
akka.loglevel = INFO | |
akka.actor.provider = "akka.cluster.ClusterActorRefProvider" | |
akka.remote.log-remote-lifecycle-events = off | |
akka.cluster.auto-join = off | |
akka.cluster.auto-down = on | |
""")) | |
object TestChatUser { | |
case class Talk(to: String, msg: Any) | |
} | |
class TestChatUser(testActor: ActorRef) extends Actor { | |
import TestChatUser._ | |
import ClusterRegistry._ | |
def receive = { | |
case Talk(to, msg) ⇒ context.parent ! Send(to, msg) | |
case msg ⇒ testActor ! msg | |
} | |
} | |
object ClusterRegistry { | |
case class CreateActor(props: Props, name: String) | |
case class Send(to: String, msg: Any) | |
case object GetCount | |
} | |
class ClusterRegistry extends Actor { | |
import ClusterRegistry._ | |
val cluster = Cluster(context.system) | |
var registry: Map[String, ActorRef] = Map.empty | |
var nodes: Set[Address] = Set.empty | |
override def preStart(): Unit = { | |
cluster.subscribe(self, classOf[MemberEvent]) | |
cluster.subscribe(self, classOf[UnreachableMember]) | |
} | |
override def postStop: Unit = { | |
cluster unsubscribe self | |
} | |
def receive = { | |
case Send(to, msg) ⇒ | |
registry.get(to) foreach { _ forward msg } | |
case GetCount ⇒ | |
sender ! registry.size | |
case CreateActor(props, name) ⇒ | |
val a = context.watch(context.actorOf(props, name)) | |
registry += (name -> a) | |
sender ! a | |
nodes foreach { replicate(a, _) } | |
case ref: ActorRef ⇒ | |
context.watch(ref) | |
registry += (ref.path.name -> ref) | |
case state: CurrentClusterState ⇒ | |
nodes = state.members.map(_.address) | |
state.members.foreach(m ⇒ if (m.status == MemberStatus.Up) replicateAll(m.address)) | |
case MemberUp(m) ⇒ | |
nodes += m.address | |
replicateAll(m.address) | |
case memberEvent: MemberEvent ⇒ | |
nodes -= memberEvent.member.address | |
case UnreachableMember(m) ⇒ | |
nodes -= m.address | |
case Terminated(a) ⇒ | |
registry -= a.path.name | |
} | |
/** | |
* Send my children to the registry at the other node | |
*/ | |
def replicateAll(address: Address): Unit = { | |
if (address != cluster.selfAddress) { | |
val other = otherRegistry(address) | |
context.children foreach { other ! _ } | |
} | |
} | |
def replicate(ref: ActorRef, address: Address): Unit = { | |
if (address != cluster.selfAddress) | |
otherRegistry(address) ! ref | |
} | |
def otherRegistry(address: Address) = context.actorFor(self.path.toStringWithAddress(address)) | |
} | |
} | |
class ClusterRegistryMultiJvmNode1 extends ClusterRegistrySpec | |
class ClusterRegistryMultiJvmNode2 extends ClusterRegistrySpec | |
class ClusterRegistryMultiJvmNode3 extends ClusterRegistrySpec | |
class ClusterRegistrySpec extends MultiNodeSpec(ClusterRegistrySpec) with STMultiNodeSpec with ImplicitSender { | |
import ClusterRegistrySpec._ | |
import ClusterRegistrySpec.ClusterRegistry._ | |
import ClusterRegistrySpec.TestChatUser._ | |
override def initialParticipants = roles.size | |
def join(from: RoleName, to: RoleName): Unit = { | |
runOn(from) { | |
Cluster(system) join node(to).address | |
createRegisty() | |
} | |
enterBarrier(from.name + "-joined") | |
} | |
def createRegisty(): ActorRef = | |
system.actorOf(Props[ClusterRegistry], name = "registry") | |
def registry: ActorRef = system.actorFor("user/registry") | |
"A ClusterRegistry" must { | |
"startup 2 node cluster" in { | |
join(first, first) | |
join(second, first) | |
enterBarrier("after-1") | |
} | |
"keep track of added users" in within(15 seconds) { | |
val r = registry | |
runOn(first) { | |
r ! CreateActor(Props(new TestChatUser(testActor)), "u1") | |
val u1 = expectMsgType[ActorRef] | |
r ! CreateActor(Props(new TestChatUser(testActor)), "u2") | |
val u2 = expectMsgType[ActorRef] | |
// talk to user at same node | |
u1 ! Talk("u2", "hello") | |
expectMsg("hello") | |
lastSender must be(u2) | |
} | |
runOn(second) { | |
r ! CreateActor(Props(new TestChatUser(testActor)), "u3") | |
expectMsgType[ActorRef] | |
} | |
runOn(first, second) { | |
awaitCond { | |
r ! GetCount | |
expectMsgType[Int] == 3 | |
} | |
} | |
enterBarrier("3-registered") | |
runOn(second) { | |
r ! CreateActor(Props(new TestChatUser(testActor)), "u4") | |
expectMsgType[ActorRef] | |
} | |
runOn(first, second) { | |
awaitCond { | |
r ! GetCount | |
expectMsgType[Int] == 4 | |
} | |
} | |
enterBarrier("4-registered") | |
runOn(first) { | |
// talk to user on another node | |
system.actorFor("/user/registry/u1") ! Talk("u4", "hi there") | |
} | |
runOn(second) { | |
expectMsg("hi there") | |
lastSender.path.name must be("u4") | |
} | |
enterBarrier("after-2") | |
} | |
"replicate users to new node" in within(20 seconds) { | |
join(third, first) | |
val r = registry | |
runOn(third) { | |
r ! CreateActor(Props(new TestChatUser(testActor)), "u5") | |
expectMsgType[ActorRef] | |
} | |
awaitCond { | |
r ! GetCount | |
expectMsgType[Int] == 5 | |
} | |
enterBarrier("5-registered") | |
runOn(third) { | |
system.actorFor("/user/registry/u5") ! Talk("u4", "go") | |
} | |
runOn(second) { | |
expectMsg("go") | |
lastSender.path.name must be("u4") | |
} | |
enterBarrier("after-3") | |
} | |
"remove terminated users" in within(5 seconds) { | |
val r = registry | |
runOn(second) { | |
system.actorFor("/user/registry/u3") ! PoisonPill | |
} | |
awaitCond { | |
r ! GetCount | |
expectMsgType[Int] == 5 | |
} | |
enterBarrier("after-4") | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment