Last active
July 21, 2023 13:17
-
-
Save kamilkloch/9a083701ee4ee9e2379d9585c8e4f0ed to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.std.Supervisor | |
import cats.effect.{IO, IOApp} | |
import cats.syntax.all._ | |
import org.HdrHistogram.{ConcurrentHistogram, Histogram} | |
import scala.concurrent.duration.DurationInt | |
import scala.util.chaining.scalaUtilChainingOps | |
object RefProducer extends IOApp.Simple { | |
val n = 20_000 | |
type Watermark = Long | |
case class Payload(ts: Long) | |
trait Sig { | |
def write: IO[Unit] | |
def read(prevWatermark: Option[Watermark]): IO[Option[(Watermark, Payload)]] | |
} | |
object Sig { | |
def create: IO[Sig] = { | |
IO.realTime.flatMap { ts => | |
IO.ref(0L -> Payload(ts.toMillis)).map { state => | |
new Sig { | |
def write: IO[Unit] = { | |
state.update { case (watermark, _) => | |
(watermark + 1, Payload(System.currentTimeMillis())) | |
} | |
} | |
def read(prevWatermark: Option[Watermark]): IO[Option[(Watermark, Payload)]] = { | |
state.get.map { case (watermark, payload) => | |
prevWatermark match { | |
case Some(prevWatermark) => | |
if (watermark == prevWatermark) | |
None | |
else if (watermark == prevWatermark + 1) | |
Some((watermark, payload)) | |
else throw new Exception(s"prevWatermark=$prevWatermark, watermark=$watermark") | |
case None => Some((watermark, payload)) | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
} | |
/** Feeds the provided Sig with a timestamp every 500ms */ | |
def tsService(sig: Sig): IO[Nothing] = | |
sig | |
.write | |
.timed | |
.flatMap(_._1.toMillis.pipe(t => IO.println(s"Producer published in ${t}ms"))) | |
.delayBy(500.millis).foreverM.onCancel(IO.println("Canceled producer")) | |
def responseStream(n: Int, sig: Sig, prev: Option[(Watermark, Payload)], hist: Histogram): IO[Unit] = { | |
if (n <= 0) IO.unit else | |
sig.read(prev.map(_._1)).flatMap { | |
case x@Some((_, Payload(tsProducer))) => | |
if (prev.isDefined) IO.realTime.flatMap { tsConsumer => | |
hist.recordValue(math.abs(tsConsumer.toMillis - tsProducer)) | |
responseStream(n - 1, sig, x, hist).delayBy(5.millis) | |
} | |
else responseStream(n - 1, sig, x, hist).delayBy(5.millis) | |
case None => responseStream(n - 1, sig, prev, hist).delayBy(5.millis) | |
} | |
} | |
private val hist = new ConcurrentHistogram(1L, 10_000L, 3) | |
def run: IO[Unit] = { | |
val doWork = Supervisor[IO].use { sup => | |
Sig.create.flatMap { sig => | |
tsService(sig).supervise(sup) >> | |
List.fill(n)(responseStream(1000, sig, None, hist)).parSequence_ | |
} | |
} | |
IO.println("Warmup") >> doWork >> IO(hist.reset()) >> IO.sleep(2.second) >> | |
IO.println("Benchmark") >> doWork >> IO(hist.outputPercentileDistribution(System.out, 1.0)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment