Created
July 20, 2023 12:10
-
-
Save kamilkloch/1e39010624f3e5ed4c5d38699da9c753 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.std.Supervisor | |
import cats.effect.{IO, IOApp} | |
import fs2.Stream | |
import fs2.concurrent.Topic | |
import cats.syntax.all._ | |
import scala.concurrent.duration.DurationInt | |
object Topics extends IOApp.Simple { | |
/** Feeds the provided topic with a timestamp every 500ms */ | |
def tsService(topic: Topic[IO, Long]): IO[Nothing] = { | |
IO.realTime.flatMap { ts => | |
val ts1Millis = ts.toMillis | |
topic.publish1(ts1Millis).map(_.fold(_ => throw new Exception("Topic closed"), _ => ts1Millis)) | |
}.flatMap { ts1Millis => | |
IO.realTime.map(ts2 => | |
if (math.abs(ts2.toMillis - ts1Millis) > 1) println(s"Delay: ${math.abs(ts2.toMillis - ts1Millis)}ms") | |
) | |
}.delayBy(500.millis).foreverM | |
} | |
val n = 10_000 | |
val consumerQueueSize = 1024 | |
def responseStreamFromTopic(topic: Topic[IO, Long]): Stream[IO, Long] = topic.subscribe(consumerQueueSize) | |
def run: IO[Unit] = { | |
Supervisor[IO].use { sup => | |
for { | |
tsTopic <- Topic[IO, Long] | |
_ <- tsService(tsTopic).supervise(sup) | |
_ <- List.fill(n)(0).parTraverse_(_ => responseStreamFromTopic(tsTopic).compile.drain) | |
} yield () | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment