-
-
Save metamorph/1621339 to your computer and use it in GitHub Desktop.
Dynamic remote deployment in Akka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package ring | |
import akka.actor._ | |
import akka.routing.NoRouter | |
import akka.remote.{RemoteScope, UnparsedTransportAddress, UnparsedSystemAddress} | |
/** | |
// application.conf: | |
akka { | |
actor { | |
provider = "akka.remote.RemoteActorRefProvider" | |
} | |
remote { | |
transport = "akka.remote.netty.NettyRemoteSupport" | |
server { | |
hostname = "127.0.0.1" | |
port = 2552 | |
} | |
} | |
} | |
*/ | |
// Messages | |
case class Address(host: String, port: Int) | |
case object Begin | |
case class Packet(lapsCompleted: Int = 0, hops: Int = 0, createdAt: Long = System.currentTimeMillis()) | |
case class NodeSetup(nextNode: ActorRef) | |
// A node in the ring. | |
class Node extends Actor with ActorLogging { | |
import context._ | |
var nextNode: Option[ActorRef] = None | |
override def preStart() { | |
log.info("Starting actor ...") | |
} | |
protected def receive = { | |
case NodeSetup(node) => { | |
nextNode = Some(node) | |
become(active) | |
} | |
} | |
def active: Receive = { | |
case p@Packet(laps, hops, createdAt) => { | |
log.info("Packet received: '{}'", p) | |
nextNode.foreach(_ ! Packet(laps, hops + 1, createdAt)) | |
} | |
} | |
} | |
// Trait that spawns child-actors on another cluster node. | |
trait RemoteSpawner { self: Actor => | |
val deployer = context.system.asInstanceOf[ActorSystemImpl].provider.deployer | |
// Abstract method to lookup the address/port to spawn an actor on. | |
def calculateSpawnAddress: Address | |
def spawn(props: Props, name: String) = { | |
val address = calculateSpawnAddress | |
// An ugly way to get the 'local' path. | |
val childPath = context.self.path.elements.tail.mkString("/", "/", "/"+name) | |
// Install the (dynamic) deployment definition for the actor. | |
val deployDef = Deploy(childPath, null, None, NoRouter, RemoteScope( | |
UnparsedSystemAddress(Some(context.system.name), UnparsedTransportAddress("akka", address.host, address.port)))) | |
deployer.deploy(deployDef) | |
// Create the actor | |
context.actorOf(props, name) | |
} | |
} | |
// The root node of the actor-ring. | |
class RingMaster(numberOfNodes: Int, numberOfLaps: Int) extends Actor with ActorLogging with RemoteSpawner { | |
var startingNode: Option[ActorRef] = None | |
// Mocked impl that will spawn actors on the 'RingServer' process. | |
def calculateSpawnAddress = Address("localhost", 2553) | |
override def preStart() { | |
log.info("Setting up {} child-nodes ...", numberOfNodes - 1) | |
// Setup the ring. | |
1.to(numberOfNodes - 1).foreach { index => | |
val childName = "node-"+index | |
spawn(Props[Node], childName) | |
} | |
// left-fold over all nodes to setup the chain in reverse. | |
startingNode = Some(context.children.foldLeft(self){ (nextNode, currentNode) => | |
currentNode.tell(NodeSetup(nextNode)) | |
currentNode | |
}) | |
} | |
protected def receive = { | |
case Begin => startingNode.foreach(_ ! Packet()) | |
case pkg@Packet(lapsCompleted, hops, createdAt) => { | |
val elapsed = System.currentTimeMillis() - createdAt | |
log.info("Package completed one lap ({} hops) in {} ms: {}", numberOfNodes, elapsed, pkg) | |
if (lapsCompleted + 1 < numberOfLaps) | |
startingNode.foreach(_ ! Packet(lapsCompleted + 1, hops + 1)) | |
else { | |
log.info("Max number of laps reached - shutting down") | |
self ! PoisonPill | |
} | |
} | |
} | |
} | |
object RunRing extends App { | |
val system = ActorSystem("sys") | |
val master = system.actorOf(Props(new RingMaster(numberOfNodes = 5, numberOfLaps = 2)), "ringMaster") | |
master ! Begin | |
// Wait for input - then shutdown | |
readLine("\nPress Enter to shut down ...\n") | |
system.shutdown() | |
} | |
object RingServer extends App { | |
// Define the port to use. | |
System.setProperty("akka.remote.server.port", "2553") | |
val system = ActorSystem("sys") | |
// Wait for input - then shutdown | |
readLine("\nPress Enter to shut down ...\n") | |
system.shutdown() | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[info] Running com.bossmedia.ring.RingServer | |
[INFO] [01/16/2012 16:18:02.786] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://[email protected]:2553 | |
[INFO] [01/16/2012 16:18:02.855] [run-main] [Remote] Starting remote server on [akka://[email protected]:2553] with node name [default] | |
Press Enter to shut down ... | |
[INFO] [01/16/2012 16:18:09.521] [pool-2-thread-3] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2552 | |
[ERROR] [01/16/2012 16:18:09.715] [pool-2-thread-3] [NettyRemoteSupport] dropping message DaemonMsgCreate(<function0>,akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-1,Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote | |
[ERROR] [01/16/2012 16:18:09.722] [pool-2-thread-3] [NettyRemoteSupport] dropping message DaemonMsgCreate(<function0>,akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-2,Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote | |
[ERROR] [01/16/2012 16:18:09.728] [pool-2-thread-3] [NettyRemoteSupport] dropping message DaemonMsgCreate(<function0>,akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-3,Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote | |
[ERROR] [01/16/2012 16:18:09.734] [pool-2-thread-3] [NettyRemoteSupport] dropping message DaemonMsgCreate(<function0>,akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-4,Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote | |
[ERROR] [01/16/2012 16:18:09.749] [pool-2-thread-3] [NettyRemoteSupport] dropping message NodeSetup(Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-1 | |
[ERROR] [01/16/2012 16:18:09.753] [pool-2-thread-3] [NettyRemoteSupport] dropping message NodeSetup(Actor[akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-1]) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-2 | |
[ERROR] [01/16/2012 16:18:09.757] [pool-2-thread-3] [NettyRemoteSupport] dropping message NodeSetup(Actor[akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-2]) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-3 | |
[ERROR] [01/16/2012 16:18:09.761] [pool-2-thread-3] [NettyRemoteSupport] dropping message NodeSetup(Actor[akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-3]) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-4 | |
[ERROR] [01/16/2012 16:18:09.772] [pool-2-thread-3] [NettyRemoteSupport] dropping message Packet(0,0,1326727089679) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-4 | |
[ERROR] [01/16/2012 16:23:51.430] [pool-2-thread-4] [NettyRemoteSupport] dropping message Terminate() for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-1 | |
[ERROR] [01/16/2012 16:23:51.432] [pool-2-thread-4] [NettyRemoteSupport] dropping message Terminate() for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-2 | |
[ERROR] [01/16/2012 16:23:51.434] [pool-2-thread-4] [NettyRemoteSupport] dropping message Terminate() for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-3 | |
[ERROR] [01/16/2012 16:23:51.436] [pool-2-thread-4] [NettyRemoteSupport] dropping message Terminate() for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-4 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment