Created
October 12, 2011 18:35
-
-
Save freekh/1282114 to your computer and use it in GitHub Desktop.
n00b fault tolerant load balancing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package akka.training | |
import akka.actor.Supervisor | |
import akka.config.Supervision._ | |
import akka.actor.Actor | |
import akka.actor.Actor._ | |
import akka.routing._ | |
import akka.config.Supervision.OneForOneStrategy | |
import akka.actor.RemoteActorRef | |
import akka.actor.ActorRef | |
import akka.serialization.ActorSerialization._ | |
import BinaryFormatMyStatelessActor._ | |
import akka.util.duration._ | |
import akka.event.EventHandler | |
//getting a tuple but I just wanted to send self (just a reference but it did not work because of serialization). | |
// strange because I am quite sure I sent the actorref (self) Oh well, maybe too hungry to figure it out | |
case class IsAlive(service : String, hostname : String, port : Int) | |
case object Alive | |
class FTPrinter(service : String, hostname : String, port : Int) extends Actor { | |
def receive = { | |
case Alive => self.channel ! IsAlive(service, hostname, port) | |
case m => println("%s: %s" format (service, m)) | |
} | |
} | |
class FTLBDispatcher extends Actor { | |
val nodes = Set(("ponger", Nodes.Two._1, Nodes.Two._2), ("pinger", Nodes.Three._1, Nodes.Three._2)) | |
def remoteActorForId(id : (String, String, Int)) = remote.actorFor(id._1, id._2, id._3) | |
val all = nodes.map( id => id -> remoteActorForId(id)).toMap | |
var live = all //assuming everybody is alive, will fail if not | |
def reInitIterator = new CyclicIterator(live.values.toList) //TODO: mixin instead? | |
var iterator = reInitIterator | |
def hearbeat = { | |
val dead = all.keySet.diff(live.keySet) | |
dead.foreach { k => //check if dead are alive | |
try { | |
all(k) tryTell Alive | |
} catch { | |
case _ => | |
} | |
} | |
} | |
def receive = { | |
case IsAlive(service, hostname, port) => | |
EventHandler.debug(this, "got an alive message from a %s at %s:%s" format(service, hostname, port)) | |
live += (service, hostname, port) -> remoteActorForId((service, hostname, port)) | |
iterator = reInitIterator | |
case m => | |
EventHandler.debug(this, "got message. live now: %s" format live) | |
try { | |
iterator.next() forward m //try to send to live ones first | |
} finally { | |
hearbeat //then see if there are any new live ones | |
} | |
} | |
override def postRestart(reason : Throwable): Unit = { | |
//TODO: use an exception, e.g. ActorCrashException, as a reason and use it | |
// to remove dead from the list? | |
//if no correct replies == assume dead, will be picked up again | |
live = all.filterNot{ p => | |
try { | |
(p._2 ? Alive).await(100 millis).isCompleted | |
} catch { | |
case _ => false | |
} | |
} | |
iterator = reInitIterator | |
EventHandler.info(this, "was restarted. live now: %s" format live) | |
} | |
} | |
object Nodes { | |
val One = ("localhost", 1337) | |
val Two = ("localhost", 6164) | |
val Three = ("localhost", 1234) | |
val Four = ("localhost", 6347) | |
} | |
object ServerOne extends App { | |
val node = Nodes.One | |
remote.start(node._1, node._2) | |
val dispatcher = actorOf[FTLBDispatcher].start() | |
remote.register("dispatcher", dispatcher) | |
val supervisor = Supervisor( | |
SupervisorConfig( | |
AllForOneStrategy(List(classOf[Exception]), 1, 10), | |
Supervise( | |
dispatcher, | |
Permanent) | |
:: Nil)) | |
} | |
object ServerTwo extends App { | |
val node = Nodes.Two | |
remote.start(node._1, node._2) | |
val service = "ponger" | |
remote.register(service, actorOf(new FTPrinter(service, node._1, node._2)).start()) | |
} | |
object ServerThree extends App { | |
val node = Nodes.Three | |
remote.start(node._1, node._2) | |
val service = "pinger" | |
remote.register(service, actorOf(new FTPrinter(service, node._1, node._2)).start()) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment