Last active
February 22, 2022 17:24
-
-
Save rossabaker/8872792b06bd84e5be8fae3c9caf8731 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import $ivy.`org.typelevel::munit-cats-effect-3:1.0.7` | |
import $ivy.`org.tpolecat::natchez-mock:0.1.6` | |
import $ivy.`co.fs2::fs2-core:3.2.5` | |
import cats.data.Kleisli | |
import cats.effect.Deferred | |
import cats.effect.IO | |
import cats.effect.IOLocal | |
import cats.effect.MonadCancelThrow | |
import cats.effect.Resource | |
import cats.syntax.all._ | |
import fs2.Pull | |
import fs2.Stream | |
import natchez._ | |
import natchez.mock._ | |
import io.opentracing.mock.{ MockSpan => OTMockSpan } | |
import io.opentracing.mock.MockTracer | |
import java.net.URI | |
import munit.CatsEffectSuite | |
import scala.concurrent.duration._ | |
import scala.jdk.CollectionConverters._ | |
trait TraceResource[F[_]] extends Trace[F] { | |
def resource[A](name: String)(r: Resource[F, A]): Resource[F, A] | |
} | |
object TraceResource { | |
def apply[F[_]](implicit ev: TraceResource[F]): ev.type = ev | |
def ioTrace(rootSpan: Span[IO]): IO[TraceResource[IO]] = | |
IOLocal(rootSpan).map { local => | |
new TraceResource[IO] { | |
def allocated(name: String): IO[(Unit, IO[Unit])] = | |
(for { | |
parent <- Resource.eval(local.get) | |
child <- parent.span(name) | |
_ <- Resource.make(local.set(child))(_ => local.set(parent)) | |
} yield ()).allocated | |
def resource[A](name: String)(r: Resource[IO, A]): Resource[IO, A] = | |
for { | |
parent <- Resource.eval(local.get) | |
child <- parent.span(name) | |
_ <- Resource.make(local.set(child))(_ => local.set(parent)) | |
a <- r | |
} yield a | |
def span[A](name: String)(k: IO[A]): IO[A] = | |
resource(name)(Resource.unit).use(_ => k) | |
// We could be smarter about inheriting these... | |
def put(fields: (String, TraceValue)*): IO[Unit] = | |
local.get.flatMap(_.put(fields: _*)) | |
def kernel: IO[Kernel] = | |
local.get.flatMap(_.kernel) | |
def traceId: IO[Option[String]] = | |
local.get.flatMap(_.traceId) | |
def traceUri: IO[Option[URI]] = | |
local.get.flatMap(_.traceUri) | |
} | |
} | |
import cats.effect.Sync | |
implicit def kleisliInstance[F[_]](implicit ev: Sync[F]): KleisliTraceResource[F] = | |
new KleisliTraceResource[F] | |
class KleisliTraceResource[F[_]](implicit ev: Sync[F]) extends TraceResource[Kleisli[F, Span[F], *]] { | |
def kernel: Kleisli[F, Span[F], Kernel] = | |
Kleisli(_.kernel) | |
def put(fields: (String, TraceValue)*): Kleisli[F, Span[F], Unit] = | |
Kleisli(_.put(fields: _*)) | |
def resource[A](name: String)(r: Resource[Kleisli[F, Span[F], *], A]): Resource[Kleisli[F, Span[F], *], A] = | |
Resource.suspend( | |
Kleisli((span: Span[F]) => | |
span.span(name).flatMap(child => r.mapK(Kleisli.applyK(child))) | |
).mapF(ra => ra.mapK(Kleisli.liftK[F, Span[F]]).pure[F]) | |
) | |
def span[A](name: String)(k: Kleisli[F, Span[F], A]): Kleisli[F,Span[F],A] = | |
Kleisli(_.span(name).use(k.run)) | |
def lens[E](f: E => Span[F], g: (E, Span[F]) => E): Trace[Kleisli[F, E, *]] = | |
new Trace[Kleisli[F, E, *]] { | |
def kernel: Kleisli[F,E,Kernel] = | |
Kleisli(e => f(e).kernel) | |
def put(fields: (String, TraceValue)*): Kleisli[F,E,Unit] = | |
Kleisli(e => f(e).put(fields: _*)) | |
def span[A](name: String)(k: Kleisli[F, E, A]): Kleisli[F, E, A] = | |
Kleisli(e => f(e).span(name).use(s => k.run(g(e, s)))) | |
def traceId: Kleisli[F,E,Option[String]] = | |
Kleisli(e => f(e).traceId) | |
def traceUri: Kleisli[F,E,Option[URI]] = | |
Kleisli(e => f(e).traceUri) | |
} | |
def traceId: Kleisli[F,Span[F],Option[String]] = | |
Kleisli(_.traceId) | |
def traceUri: Kleisli[F,Span[F],Option[URI]] = | |
Kleisli(_.traceUri) | |
} | |
} | |
class ClientTracingSuite extends CatsEffectSuite { | |
def lookupSpan(tracer: MockTracer, name: String): IO[OTMockSpan] = | |
IO.delay(tracer.finishedSpans.asScala.find(_.operationName === name)).flatMap { | |
case Some(span) => IO.pure(span) | |
case None => IO.raiseError(new NoSuchElementException(s"no span named ${name}")) | |
} | |
case class Request(editor: String) | |
case class Response(opinion: String) | |
type Client[F[_]] = Request => Resource[F, Response] | |
def client[F[_]]: Client[F] = { req => | |
req match { | |
case Request("emacs") => Resource.pure(Response("good")) | |
case _ => Resource.pure(Response("bad")) | |
} | |
} | |
def tracedClient[F[_]: MonadCancelThrow: TraceResource](client: Client[F]): Client[F] = { req => | |
TraceResource[F].resource("client")( | |
Resource( | |
Trace[F].span("acquire")(client(req).allocated).map { case (resp, release) => | |
(resp, Trace[F].span("release")(release)) | |
} | |
) | |
) | |
} | |
test("client tracing") { | |
val tracer = new MockTracer | |
for { | |
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start())) | |
_ <- TraceResource.ioTrace(root).flatMap { implicit trace => | |
val req = Request("emacs") | |
tracedClient[IO](client).apply(req).use { resp => | |
// ambient span here is client. Create child spans to our heart's delight | |
trace.span("use") { | |
trace.put(req.editor -> resp.opinion) | |
} | |
} | |
} | |
_ <- IO(root.span.finish()) | |
clientSpan <- lookupSpan(tracer, "client") | |
acquireSpan <- lookupSpan(tracer, "acquire") | |
_ = assertEquals(acquireSpan.parentId, clientSpan.context.spanId) | |
releaseSpan <- lookupSpan(tracer, "release") | |
_ = assertEquals(releaseSpan.parentId, clientSpan.context.spanId) | |
useSpan <- lookupSpan(tracer, "use") | |
_ = assertEquals(useSpan.parentId, clientSpan.context.spanId) | |
_ = assertEquals(useSpan.tags.asScala.get("emacs"), Some("good")) | |
} yield () | |
} | |
test("client tracing -- kleisli") { | |
val tracer = new MockTracer | |
val k = for { | |
root <- Kleisli.ask[IO, Span[IO]] | |
trace = Trace[Kleisli[IO, Span[IO], *]] | |
req = Request("emacs") | |
_ <- tracedClient[Kleisli[IO, Span[IO], *]](client).apply(req).use { resp => | |
// ambient span here is client. Create child spans to our heart's delight | |
trace.span("use") { | |
trace.put(req.editor -> resp.opinion) | |
} | |
} | |
} yield () | |
for { | |
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start())) | |
_ <- k.run(root) | |
_ <- IO(root.span.finish()) | |
clientSpan <- lookupSpan(tracer, "client") | |
acquireSpan <- lookupSpan(tracer, "acquire") | |
_ = assertEquals(acquireSpan.parentId, clientSpan.context.spanId) | |
releaseSpan <- lookupSpan(tracer, "release") | |
_ = assertEquals(releaseSpan.parentId, clientSpan.context.spanId) | |
useSpan <- lookupSpan(tracer, "use") | |
_ = assert(clientSpan.startMicros <= acquireSpan.startMicros) | |
_ = assert(clientSpan.finishMicros >= releaseSpan.finishMicros) | |
_ = assertEquals(useSpan.parentId, root.span.context.spanId) | |
_ = assertEquals(useSpan.tags.asScala.get("emacs"), Some("good")) | |
} yield () | |
} | |
test("asynchronous") { | |
val tracer = new MockTracer | |
for { | |
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start())) | |
done <- Deferred[IO, Unit] | |
_ <- Trace.ioTrace(root).flatMap { trace => | |
trace.span("a") { | |
(IO.sleep(100.millis) *> | |
trace.span("b")(done.complete(())) | |
).start | |
} *> | |
trace.put("ambient" -> "root") | |
} | |
_ <- done.get | |
_ <- IO(root.span.finish()) | |
a <- lookupSpan(tracer, "a") | |
b <- lookupSpan(tracer, "b") | |
_ = assertEquals(b.parentId, a.context.spanId) | |
_ = assert(b.startMicros > a.finishMicros) | |
_ = assertEquals(root.span.tags.asScala.get("ambient"), Some("root")) | |
} yield () | |
} | |
test("weird and misnested") { | |
val tracer = new MockTracer | |
for { | |
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start())) | |
_ <- TraceResource.ioTrace(root).flatMap { trace => | |
for { | |
a <- trace.resource("a")(Resource.unit).allocated | |
b <- trace.resource("b")(Resource.unit).allocated | |
_ <- a._2 | |
_ <- b._2 | |
// a is finished, and is now the ambient span. We've lost root. | |
c <- trace.resource("c")(Resource.unit).use_ | |
} yield () | |
} | |
a <- lookupSpan(tracer, "a") | |
c <- lookupSpan(tracer, "c") | |
_ <- IO(root.span.finish()) | |
_ = assertEquals(c.parentId, a.context.spanId) | |
} yield () | |
} | |
test("stream") { | |
val tracer = new MockTracer | |
for { | |
root <- IO(MockSpan[IO](tracer, tracer.buildSpan("root").start())) | |
_ <- TraceResource.ioTrace(root).flatMap { trace => | |
(for { | |
_ <- Stream.resource(trace.resource("stream")(Resource.unit)) | |
s <- Stream(1, 2, 3).foldMonoid | |
_ <- Stream.eval(trace.put("sum" -> s)) | |
} yield ()).compile.drain | |
} | |
stream <- lookupSpan(tracer, "stream") | |
_ = assertEquals(stream.parentId, root.span.context.spanId) | |
_ = assertEquals(stream.tags.asScala.get("sum"), Some(6)) | |
} yield () | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment