Skip to content

Instantly share code, notes, and snippets.

@MartinHH
Last active April 3, 2016 20:40
Show Gist options
  • Select an option

  • Save MartinHH/de62b3b081ccfee4ae7320298edd81ee to your computer and use it in GitHub Desktop.

Select an option

Save MartinHH/de62b3b081ccfee4ae7320298edd81ee to your computer and use it in GitHub Desktop.
package example
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.stream.{ClosedShape, ThrottleMode, ActorMaterializerSettings, ActorMaterializer}
import scala.collection.immutable.SortedSet
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import scala.language.postfixOps
object Sieve extends App {
implicit val system = ActorSystem()
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system))
implicit val ctx = implicitly[ExecutionContext](system.dispatcher)
def isPrime(n: Int, primesSoFar: SortedSet[Int]): Boolean =
!primesSoFar.exists(n % _ == 0) &&
!(primesSoFar.lastOption.getOrElse(2) to Math.floor(Math.sqrt(n)).toInt).par.exists(n % _ == 0)
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val naturalNumbers = Source.fromIterator(() => Iterator.from(2))
// zip the input n with a set of the primes found so far:
val nAndPreviousResults =
builder.add(ZipWith[Int, SortedSet[Int], (Int, SortedSet[Int])]((n, primes) => (n, primes)))
// filter primes using the set of previous results:
val primes = nAndPreviousResults.out.collect { case (n, p) if isPrime(n, p) => n }
// fans out the results - one for the output-sink, one for the feedback-sets:
val primeBroadCast = builder.add(Broadcast[Int](2))
// source of the result-sets - to make sure this starts, we have
// to concat an initial empty set (otherwise: deadlock):
val resultSets = builder.add(Concat[SortedSet[Int]]())
// wire the initial empty set and the actual result sets to resultSets:
Source.single(SortedSet[Int]()) ~> resultSets
primeBroadCast.out(0).fold(SortedSet[Int]())(_ + _) ~> resultSets
// wire N and the previous results to the zip function - since
// resultSets emits less values than naturalNumbers, we have
// to expand it:
naturalNumbers ~> nAndPreviousResults.in0
resultSets.out.expand(set => Iterator.continually(set)) ~> nAndPreviousResults.in1
// wire the prime results to the broadcaster:
primes ~> primeBroadCast.in
// finally, wire the 2nd boradcast output to the main output sink:
primeBroadCast.out(1).throttle(100000, 1 second, 100000, ThrottleMode.Shaping) ~> Sink.foreach[Int](println)
ClosedShape
})
RunnableGraph.fromGraph(graph).run()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment