Created
March 6, 2013 23:05
-
-
Save hoffrocket/5104055 to your computer and use it in GitHub Desktop.
Distributed Applicaiton Simulator (may be buggy and/or completely incorrect)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object QueueSim { | |
class QueuedProcessor extends Processor { | |
var lastClock: Int = 0 | |
var remainingWork: Int = 0 | |
def advanceClock(currentTime: Int) { | |
remainingWork = math.max(0, remainingWork - (currentTime - lastClock)) | |
lastClock = currentTime | |
} | |
def process(request: Request): Response = { | |
remainingWork += request.time | |
Response(remainingWork - request.time, request) | |
} | |
} | |
trait Processor { | |
def advanceClock(currentTime: Int): Unit | |
def process(request: Request): Response | |
} | |
case class Worker(id: Int, concurrency: Int) extends Processor { | |
val workerThreads: Seq[QueuedProcessor] = (1 to concurrency).map(_ => new QueuedProcessor()) | |
def advanceClock(currentTime: Int) { | |
workerThreads.foreach(_.advanceClock(currentTime)) | |
} | |
def process(request: Request): Response = { | |
var leastUtilizedThread: QueuedProcessor = null | |
workerThreads.foreach{ t => | |
if (leastUtilizedThread == null || t.remainingWork < leastUtilizedThread.remainingWork ) { | |
leastUtilizedThread = t | |
} | |
} | |
leastUtilizedThread.process(request) | |
} | |
} | |
case class Request(time: Int) | |
case class Response(queueTime: Int, request: Request) { | |
def totalTime = queueTime + request.time | |
} | |
case class RoundRobinProcessor(processors: IndexedSeq[Processor]) extends Processor { | |
var count = 0 | |
val numProcessors = processors.size | |
def advanceClock(currentTime: Int) { | |
processors.foreach(_.advanceClock(currentTime)) | |
} | |
def process(request: Request): Response = { | |
val i = count % numProcessors | |
count += 1 | |
processors(i).process(request) | |
} | |
} | |
def doStats(times: IndexedSeq[Long]) { | |
val sum: Long = times.sum | |
val average = sum/times.size | |
val stddev = times.map(v => math.abs(v - average)).sum/times.size | |
val sorted = times.sorted | |
println("average: " + average) | |
println("stddev: " + stddev) | |
List(.5,.75,.90,.95,.99,.999).foreach{ p => | |
println(" p" + (p * 100) + ": " + sorted((p * times.size).toInt)) | |
} | |
} | |
def doSim(cluster: Processor, qps: Int, median: Int, sigma: Double) { | |
def sim(qps: Int, median: Int, totalRequests: Int): IndexedSeq[Response] = { | |
println("executing " + totalRequests + " requests at " + qps + " qps") | |
val random = new scala.util.Random() | |
def nextLatency = (math.exp(sigma * random.nextGaussian) * median).toInt | |
var clock: Double = 0 | |
val requestsPerMillisecond = qps.toDouble / 1000 | |
var totalWork: Long = 0 | |
val millisPerRequest = 1.toDouble / requestsPerMillisecond | |
println("Millis per request " + millisPerRequest) | |
val result = (1 to totalRequests).map{ i => | |
cluster.advanceClock(clock.toInt) | |
clock += millisPerRequest | |
val latency = nextLatency | |
totalWork += latency | |
cluster.process(new Request(latency)) | |
}.toIndexedSeq | |
val expectedTimeMillis = totalRequests.toDouble / requestsPerMillisecond | |
if (math.round(clock) != math.round(expectedTimeMillis)) { | |
System.err.println("End clock: " + clock + " should be " + expectedTimeMillis) | |
} | |
println("Total work for system: " + totalWork) | |
println("Elapsed time: " + math.round(clock)) | |
result | |
} | |
val times = sim(qps, median, 100000) | |
println("\nQueue Times: ") | |
doStats(times.map(_.queueTime.toLong)) | |
println("\nUpstream Times: ") | |
doStats(times.map(_.request.time.toLong)) | |
} | |
def main(args: Array[String]): Unit = { | |
val workerCount = 9 | |
val loadBalancerCount = 5 | |
val maxconn = 15 | |
def workers = (1 to workerCount).map(i => new Worker(i, maxconn)) | |
val loadBalancers = (1 to loadBalancerCount).map(_ => new RoundRobinProcessor(workers)) | |
val cluster = new RoundRobinProcessor(loadBalancers) | |
doSim(cluster, 3000, 20, 1.9) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment