Created
April 16, 2021 08:10
-
-
Save hanny24/b6e5789f47b8d38340d0e10b001fdb6b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.concurrent.{Deferred, Ref} | |
import cats.effect.{ExitCode, IO, IOApp} | |
import fs2.Chunk | |
import scala.concurrent.duration._ | |
import scala.util.Random | |
object FS2Test extends IOApp { | |
override def run(args: List[String]): IO[ExitCode] = { | |
val signal = Deferred.unsafe[IO, Unit] | |
val receive = fs2.Stream | |
.unfoldEval[IO, Int, Int](0) { v => | |
IO { | |
println(s"received ${v}") | |
Some((v, v + 1)) | |
} | |
} | |
.repeat | |
.map(Some(_)) | |
val terminateSignal = fs2.Stream.eval_(signal.get) | |
val running = Ref.unsafe[IO, Int](0) | |
// this works as expected | |
// val stream = fs2.Stream(receive, terminateSignal).parJoinUnbounded.unNoneTerminate | |
val stream = receive.mergeHaltBoth(terminateSignal).unNoneTerminate | |
stream | |
.flatMap(v => fs2.Stream.chunk(Chunk(s"$v:1", s"$v:2", s"$v:3", s"$v:4", s"$v:5", s"$v:6"))) | |
.map { _ => | |
fs2.Stream.eval { | |
IO(Random.nextInt(1500)).flatMap { rnd => | |
running | |
.modify(currentlyRunning => (currentlyRunning + 1, currentlyRunning + 1)) | |
.flatMap(v => IO(println(s"currently running ${v}"))) *> | |
IO.sleep(rnd.millis) *> | |
running.update(_ - 1) | |
} | |
} | |
} | |
.parJoin(40) | |
.compile | |
.drain | |
.as(ExitCode.Success) | |
} | |
} | |
// output: | |
// received 0 | |
// currently running 2 | |
// currently running 3 | |
// currently running 1 | |
// currently running 4 | |
// currently running 6 | |
// currently running 5 | |
// received 1 | |
// currently running 1 | |
// currently running 2 | |
// currently running 3 | |
// currently running 4 | |
// currently running 5 | |
// currently running 6 | |
// received 2 | |
// currently running 1 | |
// currently running 2 | |
// currently running 3 | |
// currently running 4 | |
// currently running 5 | |
// currently running 6 | |
// received 3 | |
// ... | |
// expected output | |
// received 0 | |
// received 1 | |
// received 2 | |
// currently running 7 | |
// currently running 5 | |
// currently running 1 | |
// currently running 9 | |
// currently running 10 | |
// currently running 3 | |
// currently running 8 | |
// currently running 4 | |
// currently running 11 | |
// currently running 12 | |
// currently running 6 | |
// currently running 2 | |
// received 3 | |
// currently running 13 | |
// currently running 14 | |
// currently running 15 | |
// currently running 16 | |
// currently running 17 | |
// currently running 18 | |
// received 4 | |
// currently running 19 | |
// currently running 20 | |
// currently running 21 | |
// currently running 22 | |
// currently running 23 | |
// currently running 24 | |
// received 5 | |
// currently running 25 | |
// currently running 26 | |
// currently running 27 | |
// currently running 28 | |
// currently running 29 | |
// currently running 30 | |
// received 6 | |
// currently running 31 | |
// currently running 32 | |
// currently running 33 | |
// currently running 34 | |
// currently running 35 | |
// currently running 36 | |
// received 7 | |
// currently running 37 | |
// currently running 38 | |
// currently running 39 | |
// currently running 40 | |
// currently running 40 | |
// currently running 40 | |
// received 8 | |
// currently running 40 | |
// currently running 40 | |
// currently running 40 | |
// currently running 40 | |
// currently running 40 | |
// received 9 | |
// currently running 40 | |
// currently running 40 | |
// currently running 40 | |
// currently running 40 | |
// currently running 40 | |
// currently running 40 | |
// currently running 40 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment