Last active
August 29, 2015 13:56
-
-
Save geggo98/8913772 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
import akka.event.Logging | |
import akka.util.Timeout | |
import scala.concurrent.duration._ | |
import java.util.concurrent.TimeUnit | |
import scala.concurrent.duration.FiniteDuration | |
import scala.concurrent.ExecutionContext.Implicits.global | |
import scala.reflect.ClassTag | |
/** | |
* Created by sschwets on 07.01.14. | |
*/ | |
object Scribble extends App { | |
case class Calculate[T](values : Seq[T], index : Int, parallelLimit : Int, fn : (T,T) => T) | |
trait CalculateResponse[T] | |
case class CalculationResult[T](result : T, index : Int) extends CalculateResponse[T] | |
case object Busy extends CalculateResponse[Nothing] | |
class ParallelPrefixActor[T](implicit ev : ClassTag[T]) extends Actor { | |
val log = Logging(context.system, this) | |
val subCalculation = Props(classOf[ParallelPrefixActor[T]], ev) | |
def receive = waitForCalculation | |
def waitForCalculation : Actor.Receive = { | |
case c : Calculate[T] => | |
log.debug(s"Start calculation for ${c.values.length} values, segment nr. ${c.index}, from ${c.values.head} to ${c.values.last}") | |
if (c.values.length < c.parallelLimit) { | |
log.debug("Calculating result direct") | |
val result = c.values.reduceLeft(c.fn) | |
sender ! CalculationResult(result, c.index) | |
}else{ | |
val groupSize: Int = Math.max(1, (c.values.length / c.parallelLimit) + Math.min(c.values.length % c.parallelLimit, 1)) | |
log.debug(s"Splitting calculation for ${c.values.length} values up to ${c.parallelLimit} children, ${groupSize} elements each, limit ${c.parallelLimit}") | |
def segments=c.values.grouped(groupSize) // Type iterator should always be declared as def instead of val to avoid bugs with consumed iterators | |
log.debug("Starting children") | |
segments.zipWithIndex.foreach{case (values, index) => | |
context.actorOf(subCalculation) ! c.copy(values = values, index = index) | |
} | |
val partialResults: Array[T] = Array.ofDim[T](segments.length) | |
log.debug(s"Waiting for ${partialResults.length} results (${partialResults.indices})") | |
context.become(waitForResults(segments.length, partialResults, c, sender), discardOld = true) | |
} | |
} | |
def waitForResults(outstandingResults : Int, partialResults : Array[T], originalRequest : Calculate[T], originalSender : ActorRef) : Actor.Receive = { | |
case c : Calculate[_] => sender ! Busy | |
case r : CalculationResult[T] => | |
log.debug(s"Putting result ${r.result} on position ${r.index} in ${partialResults.length}") | |
val updatedResults = partialResults.updated(r.index, r.result) | |
log.debug("Killing sub-worker") | |
sender ! PoisonPill | |
if (outstandingResults==1) { | |
log.debug("Calculating result from partial results") | |
val result = updatedResults.reduceLeft(originalRequest.fn) | |
originalSender ! CalculationResult(result, originalRequest.index) | |
context.become(waitForCalculation, discardOld = true) | |
}else{ | |
log.debug(s"Still waiting for ${outstandingResults-1} results") | |
// For fanOut > 2 one could here already combine consecutive partial results | |
context.become(waitForResults(outstandingResults-1, updatedResults, originalRequest, originalSender), discardOld = true) | |
} | |
} | |
} | |
class LoggingActor extends Actor { | |
val log = Logging(context.system, this) | |
def receive : Actor.Receive = { | |
case any => log.info(s"""Received "${any.toString}".""") | |
} | |
} | |
// Setup the actor system | |
val system = ActorSystem("root") | |
// Start one calculation actor | |
val calculationProps = Props(classOf[ParallelPrefixActor[BigInt]], ClassTag(classOf[BigInt])) | |
val loggerProps = Props(classOf[LoggingActor]) | |
val calculateActor = system.actorOf(calculationProps, "Calcolon-BigInt") | |
val inbox = Inbox.create(system) | |
val timeOut = FiniteDuration(10, TimeUnit.SECONDS) | |
// Helper function to measure time | |
def time[A] (id : String)(f: => A) = { | |
val start = System.nanoTime() | |
val result = f | |
val stop = System.nanoTime() | |
println(s"""Time for "${id}": ${(stop-start)*1e-6d}ms""") | |
result | |
} | |
// Test code | |
val parallelLimit = 500 | |
val limit = 30000 | |
def testRange = (1 to limit).map(BigInt(_)) | |
time("par product")(testRange.par.product) // 1446.251822ms | |
time("actor product"){ // 1407.4532849999998ms | |
inbox.send(calculateActor, Calculate[BigInt](testRange, 0, parallelLimit, _ * _)) | |
inbox.receive(timeOut) | |
} | |
time("par sum")(testRange.par.sum) // 19.728831ms | |
time("actor sum"){ // 142.973606ms | |
inbox.send(calculateActor, Calculate[BigInt](testRange, 0, parallelLimit, _ + _)) | |
inbox.receive(timeOut) | |
} | |
// Alternative Test | |
import akka.pattern.{ ask, pipe } | |
implicit val timeout = Timeout(30 seconds) | |
val futureResult=calculateActor ? Calculate[BigInt](testRange, 0, parallelLimit, _ * _) | |
val futureNumberOfDigits=futureResult.mapTo[CalculateResponse[BigInt]]. | |
collect{case CalculationResult(value, _) => value}.map(_.toString.length).filter(_>10) | |
futureNumberOfDigits.onSuccess{ | |
case digits : Int => println(s"Anzahl der Stellen: $digits") | |
} | |
val loggerActor=system.actorOf(loggerProps) | |
val futurePipe=futureNumberOfDigits pipeTo loggerActor | |
// Erster blockierender Aufruf | |
import scala.concurrent.Await | |
Await.result(futurePipe, atMost=timeout.duration) | |
// Shutdown | |
Thread.sleep(5000) | |
system.shutdown() | |
System.exit(0) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment