Skip to content

Instantly share code, notes, and snippets.

@hoffrocket
Created March 6, 2013 23:05
Show Gist options
  • Save hoffrocket/5104055 to your computer and use it in GitHub Desktop.
Save hoffrocket/5104055 to your computer and use it in GitHub Desktop.
Distributed Applicaiton Simulator (may be buggy and/or completely incorrect)
object QueueSim {
class QueuedProcessor extends Processor {
var lastClock: Int = 0
var remainingWork: Int = 0
def advanceClock(currentTime: Int) {
remainingWork = math.max(0, remainingWork - (currentTime - lastClock))
lastClock = currentTime
}
def process(request: Request): Response = {
remainingWork += request.time
Response(remainingWork - request.time, request)
}
}
trait Processor {
def advanceClock(currentTime: Int): Unit
def process(request: Request): Response
}
case class Worker(id: Int, concurrency: Int) extends Processor {
val workerThreads: Seq[QueuedProcessor] = (1 to concurrency).map(_ => new QueuedProcessor())
def advanceClock(currentTime: Int) {
workerThreads.foreach(_.advanceClock(currentTime))
}
def process(request: Request): Response = {
var leastUtilizedThread: QueuedProcessor = null
workerThreads.foreach{ t =>
if (leastUtilizedThread == null || t.remainingWork < leastUtilizedThread.remainingWork ) {
leastUtilizedThread = t
}
}
leastUtilizedThread.process(request)
}
}
case class Request(time: Int)
case class Response(queueTime: Int, request: Request) {
def totalTime = queueTime + request.time
}
case class RoundRobinProcessor(processors: IndexedSeq[Processor]) extends Processor {
var count = 0
val numProcessors = processors.size
def advanceClock(currentTime: Int) {
processors.foreach(_.advanceClock(currentTime))
}
def process(request: Request): Response = {
val i = count % numProcessors
count += 1
processors(i).process(request)
}
}
def doStats(times: IndexedSeq[Long]) {
val sum: Long = times.sum
val average = sum/times.size
val stddev = times.map(v => math.abs(v - average)).sum/times.size
val sorted = times.sorted
println("average: " + average)
println("stddev: " + stddev)
List(.5,.75,.90,.95,.99,.999).foreach{ p =>
println(" p" + (p * 100) + ": " + sorted((p * times.size).toInt))
}
}
def doSim(cluster: Processor, qps: Int, median: Int, sigma: Double) {
def sim(qps: Int, median: Int, totalRequests: Int): IndexedSeq[Response] = {
println("executing " + totalRequests + " requests at " + qps + " qps")
val random = new scala.util.Random()
def nextLatency = (math.exp(sigma * random.nextGaussian) * median).toInt
var clock: Double = 0
val requestsPerMillisecond = qps.toDouble / 1000
var totalWork: Long = 0
val millisPerRequest = 1.toDouble / requestsPerMillisecond
println("Millis per request " + millisPerRequest)
val result = (1 to totalRequests).map{ i =>
cluster.advanceClock(clock.toInt)
clock += millisPerRequest
val latency = nextLatency
totalWork += latency
cluster.process(new Request(latency))
}.toIndexedSeq
val expectedTimeMillis = totalRequests.toDouble / requestsPerMillisecond
if (math.round(clock) != math.round(expectedTimeMillis)) {
System.err.println("End clock: " + clock + " should be " + expectedTimeMillis)
}
println("Total work for system: " + totalWork)
println("Elapsed time: " + math.round(clock))
result
}
val times = sim(qps, median, 100000)
println("\nQueue Times: ")
doStats(times.map(_.queueTime.toLong))
println("\nUpstream Times: ")
doStats(times.map(_.request.time.toLong))
}
def main(args: Array[String]): Unit = {
val workerCount = 9
val loadBalancerCount = 5
val maxconn = 15
def workers = (1 to workerCount).map(i => new Worker(i, maxconn))
val loadBalancers = (1 to loadBalancerCount).map(_ => new RoundRobinProcessor(workers))
val cluster = new RoundRobinProcessor(loadBalancers)
doSim(cluster, 3000, 20, 1.9)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment