Skip to content

Instantly share code, notes, and snippets.

@rossabaker
Last active December 29, 2023 17:51
Show Gist options
  • Save rossabaker/b7ad1d11f7d12c58b82a1144c64de0be to your computer and use it in GitHub Desktop.
Save rossabaker/b7ad1d11f7d12c58b82a1144c64de0be to your computer and use it in GitHub Desktop.
/.scala-build/
docker run --name jaeger \
-e COLLECTOR_OTLP_ENABLED=true \
-p 16686:16686 \
-p 4317:4317 \
-p 4318:4318 \
jaegertracing/all-in-one:1.35
//> using scala 2.13
//> using dep io.opentelemetry:opentelemetry-exporter-otlp:1.33.0
//> using dep io.opentelemetry:opentelemetry-sdk-extension-autoconfigure:1.33.0
//> using dep org.tpolecat::natchez-opentelemetry:0.3.5
//> using dep co.fs2::fs2-core:3.9.3
//> using dep org.slf4j:slf4j-simple:2.0.9
//> using `java-opt` "-Dotel.java.global-autoconfigure.enabled=true"
//> using `java-opt` "-Dotel.service.name=natchez-example"
//> using `java-opt` "-Dotel.metrics.exporter=none"
//> using `java-opt` "-Dotel.propagators=tracecontext"
import cats._
import cats.effect.{Trace => _, _}
import cats.effect.std._
import cats.effect.syntax.all._
import cats.syntax.all._
import fs2._
import fs2.concurrent._
import _root_.io.opentelemetry.api.GlobalOpenTelemetry
import natchez._
import natchez.opentelemetry._
import natchez.Trace.Implicits.noop
import scala.concurrent.duration._
object Main extends IOApp.Simple {
trait Producer[F[_], A] {
def send(a: A): F[F[Unit]]
}
object Producer {
def apply[F[_]: Async: Parallel: Console: Trace, A](
batchSize: Int,
linger: FiniteDuration,
writeLatency: FiniteDuration
): Resource[F, Producer[F, A]] = {
case class Message(a: A, acked: Deferred[F, Unit])
for {
buffer <- Resource.eval(Queue.unbounded[F, Message])
supervisor <- Supervisor[F](true)
fiber <- Async[F].background(
Stream.fromQueueUnterminated(buffer)
.groupWithin(batchSize, linger)
.through(_.evalMap(
chunk =>
Temporal[F].sleep(writeLatency) >>
chunk.parTraverse_(msg =>
Console[F].println("Acking "+msg.a) >>
msg.acked.complete(())
)
))
.compile
.drain)
} yield new Producer[F, A] {
def send(a: A): F[F[Unit]] =
for {
acked <- Deferred[F, Unit]
kernel <- Trace[F].span("send")(
buffer.offer(Message(a, acked)) >>
Trace[F].kernel.flatTap(k => Console[F].println(k))
)
} yield Trace[F].span("ack", Span.Options.parentKernel(kernel))(acked.get)
}
}
}
def run = {
val batch = (1 to 1000).toList
IO(GlobalOpenTelemetry.get).flatMap(otel =>
OpenTelemetry.entryPointFor[IO](otel).flatMap(entryPoint =>
Trace.ioTraceForEntryPoint(entryPoint).flatMap { implicit t =>
Producer[IO, Int](
batchSize = 200,
linger = 100.millis,
writeLatency = 10.millis
).use(producer =>
t.span("batch")(
Supervisor[IO](true).use { supervisor =>
for {
latch <- CountDownLatch[IO](batch.size)
_ <- batch.traverse_(producer.send(_).flatMap(ack =>
supervisor.supervise(ack >> latch.release)
))
_ <- latch.await
} yield ()
}
)
)
}
)
)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment