Created
August 9, 2017 15:29
-
-
Save lJoublanc/cbd10d947eba2da76b32678edd03ea5d to your computer and use it in GitHub Desktop.
Example of race condition fs2 branch 0.10.0: Signal completes before we can get any values out.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import fs2._ | |
import cats.effect._ | |
import fs2.async.mutable.Signal | |
import scala.concurrent.ExecutionContext.Implicits.global | |
implicit val ioInstance = Effect[IO] | |
val input = Stream.range(1,20).covary[IO] // <-- try changing this to range(1,2), result is empty! | |
val output : Stream[IO,Signal[IO,Option[Int]]] = | |
Stream eval async.signalOf[IO,Option[Int]](Some(0)) flatMap { sig => | |
Stream(sig) concurrently input.noneTerminate.evalMap(sig.set).drain | |
} | |
val apis = output flatMap { o => | |
val api1 = o.discrete.unNoneTerminate.fold(Nil : List[Int])((l,i) => i :: l) | |
val api2 = o.discrete.unNoneTerminate.fold(Nil : List[Int])((l,i) => i :: l) | |
api1 merge api2 | |
} | |
apis.runLog.unsafeRunSync() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment