Skip to content

Instantly share code, notes, and snippets.

@marcinwadon
Created January 20, 2022 23:48
Show Gist options
  • Save marcinwadon/bc5bfa0c166533c57a0d8691cf23d084 to your computer and use it in GitHub Desktop.
Save marcinwadon/bc5bfa0c166533c57a0d8691cf23d084 to your computer and use it in GitHub Desktop.
import cats._
import cats.data._
import cats.effect._
import cats.syntax.all._
import cats.effect.unsafe.implicits.global
import fs2._
import fs2.concurrent._
import scala.concurrent.duration._
sealed trait Event
case object Empty extends Event
case object Data extends Event
case class Test(text: String) extends Event
val metered = Stream.awakeEvery[IO](2.seconds).as(Empty).take(10)
val data1 = Stream.awakeDelay[IO](5.seconds).as(Data).take(1)
val data2 = Stream.awakeDelay[IO](10.seconds).as(Data).take(1)
def switchRepeat(every: FiniteDuration, switchTo: Stream[IO, Event]): Pipe[IO, Event, Event] =
in =>
Stream.force {
for {
result <- Channel.unbounded[IO, Event]
sig <- SignallingRef.of[IO, Boolean](true)
ref <- IO.ref[Long](0)
} yield {
def update = IO.monotonic.map(_.toNanos).flatMap(a => ref.modify(_ => (a, a)))
def check(last: Long) =
IO.sleep(every) >>
ref.get.map(_ == last).ifM(sig.set(false), sig.set(true))
def go(s: Stream[IO, Event]): Pull[IO, Event, Unit] =
s.pull.uncons1.flatMap {
case None => Pull.eval(result.close.void)
case Some((hd, tl)) =>
Pull.eval(update.flatMap { x =>
check(x).start
} >> result.send(hd)) >> go(tl)
}
val checking: Stream[IO, Event] = go(in).stream
def f(s: Stream[IO, Event]): Pull[IO, Event, Unit] =
s.pull.uncons1.flatMap {
case None => Pull.done
case Some((hd, tl)) => Pull.eval(result.send(hd)) >> f(tl)
}
val switching = f(switchTo.pauseWhen(sig.discrete)).stream
result.stream.concurrently(checking).concurrently(switching)
}
}
data1
.merge(data2)
.interruptAfter(16.seconds)
.through(switchRepeat(6.seconds, metered))
.compile
.toList
.unsafeRunSync()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment