Created
January 20, 2022 23:48
-
-
Save marcinwadon/bc5bfa0c166533c57a0d8691cf23d084 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats._ | |
import cats.data._ | |
import cats.effect._ | |
import cats.syntax.all._ | |
import cats.effect.unsafe.implicits.global | |
import fs2._ | |
import fs2.concurrent._ | |
import scala.concurrent.duration._ | |
sealed trait Event | |
case object Empty extends Event | |
case object Data extends Event | |
case class Test(text: String) extends Event | |
val metered = Stream.awakeEvery[IO](2.seconds).as(Empty).take(10) | |
val data1 = Stream.awakeDelay[IO](5.seconds).as(Data).take(1) | |
val data2 = Stream.awakeDelay[IO](10.seconds).as(Data).take(1) | |
def switchRepeat(every: FiniteDuration, switchTo: Stream[IO, Event]): Pipe[IO, Event, Event] = | |
in => | |
Stream.force { | |
for { | |
result <- Channel.unbounded[IO, Event] | |
sig <- SignallingRef.of[IO, Boolean](true) | |
ref <- IO.ref[Long](0) | |
} yield { | |
def update = IO.monotonic.map(_.toNanos).flatMap(a => ref.modify(_ => (a, a))) | |
def check(last: Long) = | |
IO.sleep(every) >> | |
ref.get.map(_ == last).ifM(sig.set(false), sig.set(true)) | |
def go(s: Stream[IO, Event]): Pull[IO, Event, Unit] = | |
s.pull.uncons1.flatMap { | |
case None => Pull.eval(result.close.void) | |
case Some((hd, tl)) => | |
Pull.eval(update.flatMap { x => | |
check(x).start | |
} >> result.send(hd)) >> go(tl) | |
} | |
val checking: Stream[IO, Event] = go(in).stream | |
def f(s: Stream[IO, Event]): Pull[IO, Event, Unit] = | |
s.pull.uncons1.flatMap { | |
case None => Pull.done | |
case Some((hd, tl)) => Pull.eval(result.send(hd)) >> f(tl) | |
} | |
val switching = f(switchTo.pauseWhen(sig.discrete)).stream | |
result.stream.concurrently(checking).concurrently(switching) | |
} | |
} | |
data1 | |
.merge(data2) | |
.interruptAfter(16.seconds) | |
.through(switchRepeat(6.seconds, metered)) | |
.compile | |
.toList | |
.unsafeRunSync() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment