Skip to content

Instantly share code, notes, and snippets.

@igor-vovk
Created September 24, 2023 12:52
Show Gist options
  • Save igor-vovk/13ec8e5494b9971b9237c83d4e05a099 to your computer and use it in GitHub Desktop.
Save igor-vovk/13ec8e5494b9971b9237c83d4e05a099 to your computer and use it in GitHub Desktop.
Obtaining cats.effect.Ref that is always set to a let element of fs2.Stream
/**
* Returns a resource that contains a ref set to the last element of the stream.
*/
def streamRef[F[_] : Async, A](stream: Stream[F, A], default: A): Resource[F, RefSource[F, A]] = {
val ri = for
ref <- Ref.of(default)
interrupt <- SignallingRef.of(false)
_ <- Async[F].start {
stream
.foreach(ref.set)
.interruptWhen(interrupt)
.compile.drain
}
yield (ref, interrupt)
Resource
.make(ri)(_._2.set(true))
.map(_._1)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment