Skip to content

Instantly share code, notes, and snippets.

@ryanmiville
Last active March 8, 2022 17:57
Show Gist options
  • Save ryanmiville/71d06f76dfe7c62ab96f342f5bed72b5 to your computer and use it in GitHub Desktop.
Save ryanmiville/71d06f76dfe7c62ab96f342f5bed72b5 to your computer and use it in GitHub Desktop.
Strange child span behavior with merging streams
//> using scala "2.13"
//> using lib "com.armanbilge::bayou:0.1-4fb42c8"
//> using lib "org.typelevel::cats-effect:3.3.7"
//> using lib "co.fs2::fs2-core:3.2.5"
//> using lib "org.typelevel::log4cats-core:2.2.0"
//> using lib "org.typelevel::log4cats-slf4j:2.2.0"
//> using lib "org.tpolecat::natchez-core:0.1.6"
//> using lib "org.tpolecat::natchez-log:0.1.6"
import cats.effect.{Trace => _, _}
import cats.implicits._
import fs2._
import bayou.Trace
import bayou.Trace._
import natchez.log.Log
import natchez.{Trace => _, _}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration._
import java.net.URI
object Merge extends IOApp.Simple with Setup {
def run: IO[Unit] = {
trace.use { implicit trace =>
stream.mergeHaltBoth(other).compile.lastOrError
}
}
def other: Stream[IO, Unit] = Stream.repeatEval(IO.unit)
}
/**
[info] {
"name" : "new_root",
"service" : "example",
"timestamp" : "2022-03-08T17:35:36.890081Z",
"duration_ms" : 1069,
"trace.span_id" : "61f856ad-4fee-49d2-a2f7-a68f6eeed822",
"trace.parent_id" : null,
"trace.trace_id" : "bcf45bda-1981-498e-bb56-9346761957e5",
"exit.case" : "succeeded",
"children" : [
]
}
[info] {
"name" : "new_root",
"service" : "example",
"timestamp" : "2022-03-08T17:35:37.972649Z",
"duration_ms" : 1006,
"trace.span_id" : "e2ba4bc9-3df9-4791-953e-90a03118514d",
"trace.parent_id" : null,
"trace.trace_id" : "03352921-3489-4338-a4ce-1ef519c3c2e5",
"exit.case" : "succeeded",
"children" : [
]
}
[info] {
"name" : "new_root",
"service" : "example",
"timestamp" : "2022-03-08T17:35:38.979731Z",
"duration_ms" : 1004,
"trace.span_id" : "3ede0654-76e9-4bd5-8fc0-2eccf711e614",
"trace.parent_id" : null,
"trace.trace_id" : "19704335-7843-405e-b34b-e4d42c23eb12",
"exit.case" : "succeeded",
"children" : [
]
}
[info] {
"name" : "new_root",
"service" : "example",
"timestamp" : "2022-03-08T17:35:39.984235Z",
"duration_ms" : 1001,
"trace.span_id" : "de5feeb3-3782-4010-859c-084f08b0e8a9",
"trace.parent_id" : null,
"trace.trace_id" : "1d911e8a-3cd1-4a2c-9ada-bdfc3e0343ff",
"exit.case" : "succeeded",
"children" : [
]
}
[info] {
"name" : "root",
"service" : "example",
"timestamp" : "2022-03-08T17:35:36.327727Z",
"duration_ms" : 4666,
"trace.span_id" : "78809f24-9fd1-43dc-9c8f-80ac752d44fb",
"trace.parent_id" : null,
"trace.trace_id" : "5374106f-5983-43ad-8dd3-a8a6738668dc",
"exit.case" : "succeeded",
"vals" : "10",
"children" : [
{
"name" : "child",
"service" : "example",
"timestamp" : "2022-03-08T17:35:36.900349Z",
"duration_ms" : 1034,
"trace.span_id" : "78e3aa55-6560-4a41-b8bc-08749b2008f4",
"trace.parent_id" : "5374106f-5983-43ad-8dd3-a8a6738668dc",
"trace.trace_id" : "5374106f-5983-43ad-8dd3-a8a6738668dc",
"exit.case" : "succeeded",
"children" : [
]
},
{
"name" : "child",
"service" : "example",
"timestamp" : "2022-03-08T17:35:37.973723Z",
"duration_ms" : 1004,
"trace.span_id" : "6f4e82e3-1a8c-4f75-9f20-d1cfb1e04b8f",
"trace.parent_id" : "5374106f-5983-43ad-8dd3-a8a6738668dc",
"trace.trace_id" : "5374106f-5983-43ad-8dd3-a8a6738668dc",
"exit.case" : "succeeded",
"children" : [
]
},
{
"name" : "child",
"service" : "example",
"timestamp" : "2022-03-08T17:35:38.980600Z",
"duration_ms" : 1003,
"trace.span_id" : "e651e91f-0258-4338-8792-082a4f2c494a",
"trace.parent_id" : "5374106f-5983-43ad-8dd3-a8a6738668dc",
"trace.trace_id" : "5374106f-5983-43ad-8dd3-a8a6738668dc",
"exit.case" : "succeeded",
"children" : [
]
},
{
"name" : "child",
"service" : "example",
"timestamp" : "2022-03-08T17:35:39.984715Z",
"duration_ms" : 1001,
"trace.span_id" : "745ed640-72e1-4e7e-ad97-4d592cf6fb59",
"trace.parent_id" : "5374106f-5983-43ad-8dd3-a8a6738668dc",
"trace.trace_id" : "5374106f-5983-43ad-8dd3-a8a6738668dc",
"exit.case" : "succeeded",
"children" : [
]
}
]
}
*/
//> using scala "2.13"
//> using lib "com.armanbilge::bayou:0.1-4fb42c8"
//> using lib "org.typelevel::cats-effect:3.3.7"
//> using lib "co.fs2::fs2-core:3.2.5"
//> using lib "org.typelevel::log4cats-core:2.2.0"
//> using lib "org.typelevel::log4cats-slf4j:2.2.0"
//> using lib "org.tpolecat::natchez-core:0.1.6"
//> using lib "org.tpolecat::natchez-log:0.1.6"
import cats.effect.{Trace => _, _}
import cats.implicits._
import fs2._
import bayou.Trace
import bayou.Trace._
import natchez.log.Log
import natchez.{Trace => _, _}
import org.typelevel.log4cats.Logger
import org.typelevel.log4cats.slf4j.Slf4jLogger
import scala.concurrent.duration._
import java.net.URI
trait Setup {
val entryPoint = Log.entryPoint[IO]("example")
val trace = entryPoint.root("root").flatMap(r => Resource.eval(Trace.ioTrace(r)))
def stream(implicit trace: IOTrace): Stream[IO, Unit] = {
Stream(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.covary[IO]
.chunkN(3)
.flatMap { chunk =>
for {
span <- Stream.resource(entryPoint.root("new_root"))
_ <- Stream.resource(trace.spanR(span))
_ <- Stream.resource(trace.spanR("child"))
_ <- Stream.eval(trace.put("vals" -> chunk.toList.mkString(",")))
_ <- Stream.sleep[IO](1.second)
} yield ()
}
}
implicit def logger: Logger[IO] =
new Logger[IO] {
def error(message: => String): IO[Unit] =
IO.println(s"[error] $message\n")
def warn(message: => String): IO[Unit] =
IO.println(s"[warn] $message\n")
def info(message: => String): IO[Unit] =
IO.println(s"[info] $message\n")
def debug(message: => String): IO[Unit] =
IO.println(s"[debug] $message\n")
def trace(message: => String): IO[Unit] =
IO.println(s"[trace] $message\n")
def error(t: Throwable)(message: => String): IO[Unit] =
IO.println(s"[error] $message\n${t.getMessage}")
def warn(t: Throwable)(message: => String): IO[Unit] =
IO.println(s"[warn] $message\n${t.getMessage}")
def info(t: Throwable)(message: => String): IO[Unit] =
IO.println(s"[info] $message\n${t.getMessage}")
def debug(t: Throwable)(message: => String): IO[Unit] =
IO.println(s"[debug] $message\n${t.getMessage}")
def trace(t: Throwable)(message: => String): IO[Unit] =
IO.println(s"[trace] $message\n${t.getMessage}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment