Last active
May 5, 2021 21:10
-
-
Save codepr/779f5a6fd03c21446cfc to your computer and use it in GitHub Desktop.
Basic hello world cluster server
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package clusterserver | |
import akka.actor._ | |
class Logger extends Actor with ActorLogging { | |
log.info("Logger started!") | |
def receive = { | |
case msg => log.info("Got msg: {}", msg) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "cluster-server" | |
version := "1.0" | |
scalaVersion := "2.11.8" | |
libraryDependencies ++= Seq( | |
"com.typesafe.akka" %% "akka-actor" % "2.4.2", | |
"com.typesafe.akka" %% "akka-cluster" % "2.4.2" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package clusterserver | |
import akka.actor._ | |
import akka.routing._ | |
import akka.cluster._ | |
import akka.cluster.routing._ | |
import akka.io.{ IO, Tcp } | |
import akka.util.ByteString | |
import java.net.InetSocketAddress | |
import com.typesafe.config.ConfigFactory | |
/** | |
* Server actor listening for connections on a random free port. | |
* This must be done in order to run a simulation cluster on a single node | |
* cause we need to bind our server socket to a different port | |
* for every instance. In a real distributed system we can select a single | |
* distinct port and bound every instance to it. | |
* | |
* @author: Andrea Giacomo Baldan | |
* @version: 1.0 | |
* @date: 2016/03/26 | |
*/ | |
class Server(ref: ActorRef) extends Actor with ActorLogging { | |
import Tcp._ | |
import context.system | |
IO(Tcp) ! Bind(self, new InetSocketAddress("127.0.0.1", 0)) // port set to 0 means a random non-used port after the 1000th | |
def receive = { | |
case b @ Bound(localAddress) => | |
println("[*] " + localAddress.getHostString + " listening on port " + localAddress.getPort) | |
case CommandFailed(_: Bind) => context stop self | |
case c @ Connected(remote, local) => | |
println("[*] Connection received from " + remote.getHostString) | |
val connection = sender | |
val handler = context.actorOf(Props(new LoggerHandler(remote, ref))) | |
connection ! Register(handler) | |
connection ! Write(ByteString("\n\n[*] Hello from " + local.getHostString + ":" + local.getPort + "\n\n")) | |
} | |
} | |
/** | |
* Handler class for every incoming client, it's responsible for the entire life-cycle | |
* of the connection, handle all commands from the user. | |
* Handle "single", "multiple" and "quit" commands, just for simple cluster testing purpose | |
*/ | |
class LoggerHandler(reference: InetSocketAddress, ref: ActorRef) extends Actor with ActorLogging { | |
import Tcp._ | |
val SINGLE = ByteString("single\r\n") | |
val MULTIPLE = ByteString("multiple\r\n") | |
val QUIT = ByteString("quit\r\n") | |
def receive = { | |
case Received(data) => | |
data match { | |
case SINGLE => ref ! 1 | |
case MULTIPLE => (1 to 10).foreach(i => ref ! i) | |
case QUIT => self ! PeerClosed | |
case _ => sender ! Write(ByteString("hello")) | |
} | |
case PeerClosed => | |
context stop self | |
} | |
} | |
/** | |
* Set one seed-node at localhost:2500, just for simulation purpose. | |
* Just play with router settings and round robin settings to change | |
* behaviour of the router and routees. | |
* | |
* Run instructions: | |
* In order to test the effectively load balance between the two simulated node in a local single node | |
* we need to start the first instance of the server binding to the tcp port 2500 in order to simulate | |
* the seed node. | |
* NODE A(e.g. shell A): sbt run -Dakka.remote.netty.tcp.port=2500 | |
* NODE B(e.g. shell B): sbt run -Dakka.remote.netty.tcp.port=0 | |
*/ | |
object TestSystem extends App { | |
val config = ConfigFactory.parseString(s""" | |
akka.actor.provider=akka.cluster.ClusterActorRefProvider | |
akka.cluster.seed-nodes = ["akka.tcp://[email protected]:2500"] | |
""") | |
val system = ActorSystem("DumbSystem", config) | |
Cluster(system).registerOnMemberUp { | |
val roundRobinPool = RoundRobinPool(nrOfInstances = 10) | |
val clusterRoutingSettings = ClusterRouterPoolSettings(totalInstances = 10, maxInstancesPerNode = 5, allowLocalRoutees = true, useRole = None) | |
val clusterPool = ClusterRouterPool(roundRobinPool, clusterRoutingSettings) | |
val router = system.actorOf(clusterPool.props(Props[Logger])) | |
system.actorOf(Props(new Server(router))) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment