Skip to content

Instantly share code, notes, and snippets.

@metamorph
Created January 16, 2012 15:20
Show Gist options
  • Save metamorph/1621339 to your computer and use it in GitHub Desktop.
Save metamorph/1621339 to your computer and use it in GitHub Desktop.
Dynamic remote deployment in Akka
package ring
import akka.actor._
import akka.routing.NoRouter
import akka.remote.{RemoteScope, UnparsedTransportAddress, UnparsedSystemAddress}
/**
// application.conf:
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
}
remote {
transport = "akka.remote.netty.NettyRemoteSupport"
server {
hostname = "127.0.0.1"
port = 2552
}
}
}
*/
// Messages
case class Address(host: String, port: Int)
case object Begin
case class Packet(lapsCompleted: Int = 0, hops: Int = 0, createdAt: Long = System.currentTimeMillis())
case class NodeSetup(nextNode: ActorRef)
// A node in the ring.
class Node extends Actor with ActorLogging {
import context._
var nextNode: Option[ActorRef] = None
override def preStart() {
log.info("Starting actor ...")
}
protected def receive = {
case NodeSetup(node) => {
nextNode = Some(node)
become(active)
}
}
def active: Receive = {
case p@Packet(laps, hops, createdAt) => {
log.info("Packet received: '{}'", p)
nextNode.foreach(_ ! Packet(laps, hops + 1, createdAt))
}
}
}
// Trait that spawns child-actors on another cluster node.
trait RemoteSpawner { self: Actor =>
val deployer = context.system.asInstanceOf[ActorSystemImpl].provider.deployer
// Abstract method to lookup the address/port to spawn an actor on.
def calculateSpawnAddress: Address
def spawn(props: Props, name: String) = {
val address = calculateSpawnAddress
// An ugly way to get the 'local' path.
val childPath = context.self.path.elements.tail.mkString("/", "/", "/"+name)
// Install the (dynamic) deployment definition for the actor.
val deployDef = Deploy(childPath, null, None, NoRouter, RemoteScope(
UnparsedSystemAddress(Some(context.system.name), UnparsedTransportAddress("akka", address.host, address.port))))
deployer.deploy(deployDef)
// Create the actor
context.actorOf(props, name)
}
}
// The root node of the actor-ring.
class RingMaster(numberOfNodes: Int, numberOfLaps: Int) extends Actor with ActorLogging with RemoteSpawner {
var startingNode: Option[ActorRef] = None
// Mocked impl that will spawn actors on the 'RingServer' process.
def calculateSpawnAddress = Address("localhost", 2553)
override def preStart() {
log.info("Setting up {} child-nodes ...", numberOfNodes - 1)
// Setup the ring.
1.to(numberOfNodes - 1).foreach { index =>
val childName = "node-"+index
spawn(Props[Node], childName)
}
// left-fold over all nodes to setup the chain in reverse.
startingNode = Some(context.children.foldLeft(self){ (nextNode, currentNode) =>
currentNode.tell(NodeSetup(nextNode))
currentNode
})
}
protected def receive = {
case Begin => startingNode.foreach(_ ! Packet())
case pkg@Packet(lapsCompleted, hops, createdAt) => {
val elapsed = System.currentTimeMillis() - createdAt
log.info("Package completed one lap ({} hops) in {} ms: {}", numberOfNodes, elapsed, pkg)
if (lapsCompleted + 1 < numberOfLaps)
startingNode.foreach(_ ! Packet(lapsCompleted + 1, hops + 1))
else {
log.info("Max number of laps reached - shutting down")
self ! PoisonPill
}
}
}
}
object RunRing extends App {
val system = ActorSystem("sys")
val master = system.actorOf(Props(new RingMaster(numberOfNodes = 5, numberOfLaps = 2)), "ringMaster")
master ! Begin
// Wait for input - then shutdown
readLine("\nPress Enter to shut down ...\n")
system.shutdown()
}
object RingServer extends App {
// Define the port to use.
System.setProperty("akka.remote.server.port", "2553")
val system = ActorSystem("sys")
// Wait for input - then shutdown
readLine("\nPress Enter to shut down ...\n")
system.shutdown()
}
[info] Running com.bossmedia.ring.RingServer
[INFO] [01/16/2012 16:18:02.786] [run-main] [ActorSystem] REMOTE: RemoteServerStarted@akka://[email protected]:2553
[INFO] [01/16/2012 16:18:02.855] [run-main] [Remote] Starting remote server on [akka://[email protected]:2553] with node name [default]
Press Enter to shut down ...
[INFO] [01/16/2012 16:18:09.521] [pool-2-thread-3] [ActorSystem] REMOTE: RemoteClientStarted@akka://127.0.0.1:2552
[ERROR] [01/16/2012 16:18:09.715] [pool-2-thread-3] [NettyRemoteSupport] dropping message DaemonMsgCreate(<function0>,akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-1,Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote
[ERROR] [01/16/2012 16:18:09.722] [pool-2-thread-3] [NettyRemoteSupport] dropping message DaemonMsgCreate(<function0>,akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-2,Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote
[ERROR] [01/16/2012 16:18:09.728] [pool-2-thread-3] [NettyRemoteSupport] dropping message DaemonMsgCreate(<function0>,akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-3,Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote
[ERROR] [01/16/2012 16:18:09.734] [pool-2-thread-3] [NettyRemoteSupport] dropping message DaemonMsgCreate(<function0>,akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-4,Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote
[ERROR] [01/16/2012 16:18:09.749] [pool-2-thread-3] [NettyRemoteSupport] dropping message NodeSetup(Actor[akka://[email protected]:2552/user/ringMaster]) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-1
[ERROR] [01/16/2012 16:18:09.753] [pool-2-thread-3] [NettyRemoteSupport] dropping message NodeSetup(Actor[akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-1]) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-2
[ERROR] [01/16/2012 16:18:09.757] [pool-2-thread-3] [NettyRemoteSupport] dropping message NodeSetup(Actor[akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-2]) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-3
[ERROR] [01/16/2012 16:18:09.761] [pool-2-thread-3] [NettyRemoteSupport] dropping message NodeSetup(Actor[akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-3]) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-4
[ERROR] [01/16/2012 16:18:09.772] [pool-2-thread-3] [NettyRemoteSupport] dropping message Packet(0,0,1326727089679) for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-4
[ERROR] [01/16/2012 16:23:51.430] [pool-2-thread-4] [NettyRemoteSupport] dropping message Terminate() for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-1
[ERROR] [01/16/2012 16:23:51.432] [pool-2-thread-4] [NettyRemoteSupport] dropping message Terminate() for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-2
[ERROR] [01/16/2012 16:23:51.434] [pool-2-thread-4] [NettyRemoteSupport] dropping message Terminate() for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-3
[ERROR] [01/16/2012 16:23:51.436] [pool-2-thread-4] [NettyRemoteSupport] dropping message Terminate() for non-local recipient akka://sys@localhost:2553/remote/[email protected]:2552/user/ringMaster/node-4
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment