Skip to content

Instantly share code, notes, and snippets.

import java.util.concurrent.atomic.AtomicLong
import zio._
import zio.clock.Clock
import zio.duration._
import zio.stream.ZStream
object Problem extends App {
/* override val platform: Platform =
Platform.default.withTracing(Tracing.disabled)*/
//Any - because everything in zio is fucking private even diagnostic trait
class StreamDiagnostics(val queue: Queue[Any]) extends LogSupport {
val map = TrieMap.empty[String, Promise[Throwable, Unit]]
def runWithMonitor[R](tp: TopicPartition, stream: ZStream[R, Throwable, _]): ZIO[R, Throwable, Unit] =
for {
p <- Promise.make[Throwable, Unit]
_ <- queue.offer(StreamControl(tp, p))
fb <- stream