Last active
April 3, 2016 20:40
-
-
Save MartinHH/de62b3b081ccfee4ae7320298edd81ee to your computer and use it in GitHub Desktop.
Code example for https://stackoverflow.com/questions/36381603/how-can-i-modify-my-akka-streams-prime-sieve-to-exclude-modulo-checks-for-known
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| package example | |
| import akka.actor.ActorSystem | |
| import akka.stream.scaladsl._ | |
| import akka.stream.{ClosedShape, ThrottleMode, ActorMaterializerSettings, ActorMaterializer} | |
| import scala.collection.immutable.SortedSet | |
| import scala.concurrent.duration._ | |
| import scala.concurrent.ExecutionContext | |
| import scala.language.postfixOps | |
| object Sieve extends App { | |
| implicit val system = ActorSystem() | |
| implicit val mat = ActorMaterializer(ActorMaterializerSettings(system)) | |
| implicit val ctx = implicitly[ExecutionContext](system.dispatcher) | |
| def isPrime(n: Int, primesSoFar: SortedSet[Int]): Boolean = | |
| !primesSoFar.exists(n % _ == 0) && | |
| !(primesSoFar.lastOption.getOrElse(2) to Math.floor(Math.sqrt(n)).toInt).par.exists(n % _ == 0) | |
| val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder => | |
| import GraphDSL.Implicits._ | |
| val naturalNumbers = Source.fromIterator(() => Iterator.from(2)) | |
| // zip the input n with a set of the primes found so far: | |
| val nAndPreviousResults = | |
| builder.add(ZipWith[Int, SortedSet[Int], (Int, SortedSet[Int])]((n, primes) => (n, primes))) | |
| // filter primes using the set of previous results: | |
| val primes = nAndPreviousResults.out.collect { case (n, p) if isPrime(n, p) => n } | |
| // fans out the results - one for the output-sink, one for the feedback-sets: | |
| val primeBroadCast = builder.add(Broadcast[Int](2)) | |
| // source of the result-sets - to make sure this starts, we have | |
| // to concat an initial empty set (otherwise: deadlock): | |
| val resultSets = builder.add(Concat[SortedSet[Int]]()) | |
| // wire the initial empty set and the actual result sets to resultSets: | |
| Source.single(SortedSet[Int]()) ~> resultSets | |
| primeBroadCast.out(0).fold(SortedSet[Int]())(_ + _) ~> resultSets | |
| // wire N and the previous results to the zip function - since | |
| // resultSets emits less values than naturalNumbers, we have | |
| // to expand it: | |
| naturalNumbers ~> nAndPreviousResults.in0 | |
| resultSets.out.expand(set => Iterator.continually(set)) ~> nAndPreviousResults.in1 | |
| // wire the prime results to the broadcaster: | |
| primes ~> primeBroadCast.in | |
| // finally, wire the 2nd boradcast output to the main output sink: | |
| primeBroadCast.out(1).throttle(100000, 1 second, 100000, ThrottleMode.Shaping) ~> Sink.foreach[Int](println) | |
| ClosedShape | |
| }) | |
| RunnableGraph.fromGraph(graph).run() | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment