Last active
August 29, 2015 14:02
-
-
Save visualskyrim/00d519e55399b676ae5b to your computer and use it in GitHub Desktop.
AkkaTest.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Synapse | |
import akka.actor._ | |
import scala.concurrent.duration._ | |
import akka.util.Timeout | |
import akka.pattern.ask | |
import akka.dispatch._ | |
import scala.concurrent.{Future, Await, ExecutionContext} | |
import ExecutionContext.Implicits.global | |
import akka.dispatch.Futures.sequence | |
import akka.dispatch.Futures.future | |
import akka.routing.RoundRobinRouter | |
import akka.actor.Actor.Receive | |
import com.typesafe.config.ConfigFactory | |
object Synapse { | |
val taskMark = 10000000 | |
trait Transmitter | |
case class Stimulus(verb: String, uid: String, localStartedTime: Long) extends Transmitter | |
case class Feedback(actions: List[String], uid: String, localStartTime: Long) extends Transmitter | |
implicit val system = ActorSystem("Synapse", ConfigFactory.load.getConfig("dummy")) | |
val notifier = system.actorOf( | |
Props(new Effector(taskMark)) | |
.withDispatcher("simu-dispatcher"), | |
name="system-notifier" | |
) | |
val nodeE = system.actorOf( | |
Props(new ActionCellular(notifier, List[String]("F"))) | |
.withDispatcher("simu-dispatcher") | |
.withRouter(RoundRobinRouter(10)), | |
name="NodeE" | |
) | |
val nodeC = system.actorOf( | |
Props(new ActionCellular(notifier, List[String]("F"))) | |
.withDispatcher("simu-dispatcher") | |
.withRouter(RoundRobinRouter(10)), | |
name="NodeC" | |
) | |
val nodeB = system.actorOf( | |
Props(new ActionCellular(notifier, List[String]("D", "E"))) | |
.withDispatcher("simu-dispatcher") | |
.withRouter(RoundRobinRouter(10)), | |
name="NodeB" | |
) | |
val nodeA = system.actorOf( | |
Props(new ActionCellular(notifier, List[String]("B", "C"))) | |
.withDispatcher("simu-dispatcher") | |
.withRouter(RoundRobinRouter(10)), | |
name="NodeA" | |
) | |
val conditionReflection = Map[String, ActorRef]( | |
"A" -> nodeA, | |
"B" -> nodeB, | |
"C" -> nodeC, | |
"E" -> nodeE | |
) | |
val trans = system.actorOf( | |
Props(new TransCellular(conditionReflection)) | |
.withDispatcher("workers-dispatcher"), | |
name="default" | |
) | |
def Stimulate() = { | |
for(i <- 1 to taskMark) { | |
val localStartedTime = System.currentTimeMillis() | |
trans ! Stimulus("A", i.toString, localStartedTime) | |
} | |
println("All jobs sent") | |
} | |
class TransCellular(conditionReflect: Map[String, ActorRef]) extends Actor { | |
def receive = { | |
case Stimulus(verb, uid, start) => | |
conditionReflect.get(verb).get ! Stimulus(verb, uid, start) | |
case Feedback(actions, uid, start) => | |
actions.foreach{ | |
x => | |
if (conditionReflect.contains(x)){ | |
conditionReflect.get(x).get ! Stimulus(x, uid, start) | |
} | |
} | |
} | |
} | |
class ActionCellular(notifier: ActorRef, actions: List[String]) extends Actor { | |
def receive = { | |
case Stimulus(verb, uid, localStartTime) => | |
context.system.scheduler.scheduleOnce(500 milliseconds){ | |
//println("%s => %s : Finished!".format(verb, uid)) | |
notifier ! Feedback(actions, uid, localStartTime) | |
} | |
context.system.scheduler.scheduleOnce(500 milliseconds, sender(), Feedback(actions, uid, localStartTime)) | |
} | |
} | |
class Effector(taskMark: Int) extends Actor { | |
val startedTime = System.currentTimeMillis() | |
var count = 0 | |
val avgWaitMark = scala.collection.mutable.Map[String, Int]() | |
var currentTotalWait = 0.0 | |
def receive = { | |
case Feedback(actions, uid, localStartTime) => | |
count += 1 | |
if (count%(taskMark*4/20)==0) println("%d/%d processed".format(count, taskMark * 4)) | |
if (count == taskMark * 4) { | |
val now=System.currentTimeMillis() | |
println("Everything processed in %d seconds".format((now-startedTime)/1000)) | |
println("Avg. Wait: %f".format(currentTotalWait/taskMark)) | |
context.system.shutdown() | |
} | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
dummy { | |
akka { | |
loglevel = "DEBUG" | |
stdout-loglevel = "DEBUG" | |
log-dead-letters = 0 | |
log-dead-letters-during-shutdown = on | |
actor { | |
default-dispatcher { | |
} | |
} | |
scheduler { | |
tick-duration = 50ms | |
ticks-per-wheel = 128 | |
} | |
} | |
simu-dispatcher { | |
type = Dispatcher | |
executor = "fork-join-executor" | |
fork-join-executor { | |
parallelism-min = 0 | |
parallelism-max = 600 | |
parallelism-factor = 3.0 | |
} | |
mailbox-capacity = 100000 | |
} | |
workers-dispatcher { | |
mailbox-capacity = 10000 | |
executor = "fork-join-executor" | |
fork-join-executor { | |
parallelism-min = 0 | |
parallelism-max = 6000 | |
parallelism-factor = 3.0 | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment