Skip to content

Instantly share code, notes, and snippets.

@leszekgruchala
Created April 25, 2020 10:03
Show Gist options
  • Save leszekgruchala/83407420a81430d252a103cdaf10aee9 to your computer and use it in GitHub Desktop.
Save leszekgruchala/83407420a81430d252a103cdaf10aee9 to your computer and use it in GitHub Desktop.
ZStream.interruptWhen
import zio._
import zio.stream._
import zio.duration._
object TestStream extends App {
//There are few issues:
// 1. .interruptWhen(hook) //BOOM: With this, only 1 element of A will be processed, the program no longer processes anything
// 2. The program is unable to finish with status code 0
// 3. "Processes N element" never prints
def stream[R](hook: Promise[Throwable, Boolean]) = {
def streamOf(name: String) =
ZStream
.fromEffect(
ZIO.sleep(1.second) *> ZIO.effectTotal(Seq(name))
)
.flatMap(events => Stream.fromIterable(events))
.mapM(e => ZIO.effect(println(s"Processing element of $e")))
(streamOf("A") ++ streamOf("B").forever)
.interruptWhen(hook)
// .haltWhen(hook) //Working: With this, 1 element of A and forever elements of B
.runCount
}
val test =
ZManaged.make(
for {
_ <- ZIO.effect(println("Starting"))
hook <- Promise.make[Throwable, Boolean]
streamFib <- stream(hook).fork
} yield (hook, streamFib)
) {
case (hook, streamFib) =>
(for {
_ <- ZIO.effect(println("Stopping"))
_ <- hook.succeed(true)
count <- streamFib.join
_ <- ZIO.effect(println("Finished processing"))
_ <- ZIO.effect(println("Interrupting"))
_ <- streamFib.interrupt //In my production application, this hangs, here it's fine.
_ <- ZIO.effect(println("Interrupted"))
} yield count).foldM(
t => IO.effectTotal("Failed to terminate the stream", t),
count => IO.effectTotal(s"Processed $count element") //Never prints
)
}
override def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] =
test.useForever.fold(t => { println(s"Failed with ${t.getMessage}"); 1 }, _ => 0)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment