Last active
January 20, 2019 07:52
-
-
Save pchlupacek/f53f63711d9a71a7b3aba19187503ab1 to your computer and use it in GitHub Desktop.
fs2 par join - comparison
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* related to case https://github.com/functional-streams-for-scala/fs2/issues/1397 **/ | |
package fs2.concurrent | |
import java.util.concurrent.Executors | |
import cats.Traverse | |
import cats.implicits._ | |
import cats.effect.{ExitCode, IO, IOApp} | |
import scala.annotation.tailrec | |
import scala.concurrent.{Await, ExecutionContext, Future} | |
import scala.concurrent.duration._ | |
object Application extends IOApp { | |
private def isPrime(n: Long): (Long, Boolean) = n match { | |
case 1 => (1, true) | |
case n => | |
val sqn = Math.sqrt(n) | |
var i = 2 | |
while (i <= sqn) { | |
if (n % i == 0) return (n, false) | |
i += 1 | |
} | |
(n, true) | |
} | |
val concurrency = Runtime.getRuntime.availableProcessors().max(4) | |
implicit val ec = | |
ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(concurrency)) | |
@tailrec | |
private def isMonotonicallyIncreasing(ns: Vector[(Long, Boolean)]): Boolean = | |
if (ns.size <= 1) true | |
else { | |
if (ns(0)._1 <= ns(1)._1) isMonotonicallyIncreasing(ns.tail) | |
else false | |
} | |
def concurrent7_parallel_future() = { | |
val data = (9138000000L to 9139000000L).toVector | |
val computation = Future.sequence(data.map(x => Future(isPrime(x))(ec))) | |
val result = Await.result(computation, Duration.Inf) | |
result | |
} | |
def concurrent8_parallel_execution_io() = { | |
val data = (9138000000L to 9139000000L).toVector | |
val computation = for { | |
fibers <- Traverse[Vector].sequence(data.map(x => IO(isPrime(x)).start)) | |
results <- Traverse[Vector].sequence(fibers.map(_.join)) | |
} yield results | |
computation.unsafeRunSync() | |
} | |
def concurrent9_parallel_execution_fs2: Vector[(Long, Boolean)] = { | |
import fs2.Stream | |
val offset: Long = 9138000000L | |
val data: Stream[IO, Int] = | |
Stream.range(0, 1000000) | |
implicit val concurrent = IO.ioConcurrentEffect(IO.contextShift(ec)) | |
val computations: Stream[IO, (Long, Boolean)] = | |
data.balanceThrough(Int.MaxValue, concurrency) { balanced => | |
balanced.map { idx => | |
isPrime(idx + offset) | |
} | |
} | |
computations.compile.toVector.unsafeRunSync() | |
} | |
def time(name: String)(action: => Vector[(Long, Boolean)]): Unit = { | |
println(s"starting $name") | |
val start = System.nanoTime() | |
val results = action | |
val end = System.nanoTime() | |
val delta = end - start | |
println(s"$name took ${delta.nanos.toMillis} millis") | |
println(s"$name results: ${results.size}") | |
println(s"$name isMonotonic: ${isMonotonicallyIncreasing(results)}") | |
} | |
override def run(args: List[String]): IO[ExitCode] = | |
IO { | |
time("SFututre")(concurrent7_parallel_future) | |
time("IO")(concurrent8_parallel_execution_io) | |
time("FS2")(concurrent9_parallel_execution_fs2) | |
ec.shutdown() | |
ExitCode.Success | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment