-
-
Save narma/4b1898ca849dc2582bc506d4452df8cb to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Any - because everything in zio is private even diagnostic trait | |
class StreamDiagnostics(val queue: Queue[Any]) extends LogSupport { | |
val map = TrieMap.empty[String, Promise[Throwable, Unit]] | |
def runWithMonitor[R](tp: TopicPartition, stream: ZStream[R, Throwable, _]): ZIO[R, Throwable, Unit] = | |
for { | |
p <- Promise.make[Throwable, Unit] | |
_ <- queue.offer(StreamControl(tp, p)) | |
fb <- stream | |
.interruptWhen(p) | |
.runDrain.tap(_ => p.succeed(())).tapCause(_ => p.succeed(())) | |
.fork | |
_ <- fb.join | |
} yield () | |
def start() = | |
ZStream | |
.fromQueueWithShutdown(queue) | |
.mapM { | |
//IMPORTANT! Do not use Revoked and Assigned types provided by zio-kafka for termination of existings streams | |
case e: StreamControl => | |
val key = s"${e.tp.topic()}-${e.tp.partition()}" | |
map.remove(key) match { | |
case Some(v) => | |
warn( | |
s"[Diagnostics:StreamControl][$key] Attempt to assign partition, that already assigned, revoking old one" | |
) | |
map.update(key, e.p) | |
v.succeed(()) | |
case _ => | |
info(s"[Diagnostics:StreamControl][$key] Successfully assigned") | |
map.update(key, e.p) | |
Task.unit | |
} | |
case _ => ZIO.unit | |
} | |
.runDrain | |
} | |
object StreamDiagnostics{ | |
case class StreamControl(tp: TopicPartition, p: Promise[Throwable, Unit]) | |
def live = (for { | |
diagnosticQueue <- Queue.bounded[Any](128) | |
streamDiagnostics <- StreamDiagnostics.make(diagnosticQueue) | |
_ <- streamDiagnostics.start().fork | |
} yield streamDiagnostics).toLayer | |
} | |
//Usage example | |
object Example{ | |
val diagnosticsLayer = StreamDiagnostics.live | |
val consumerLayer = Consumer | |
.make( | |
consumerSettings, | |
diagnostics = Diagnostics.SlidingQueue( | |
streamDiagnostics.queue.asInstanceOf[Queue[DiagnosticEvent]]) //Reasoning? Check top comment^ | |
) | |
.toLayer | |
def run(streamDiagnostics: StreamDiagnostics) = { | |
Consumer | |
.subscribeAnd(...) | |
.partitionedStream(Serde.long.asTry, Serde.byteArray) | |
.mapMParUnordered(Byte.MaxValue) { | |
case (tp, s) => | |
streamDiagnostics | |
.runWithMonitor(tp, s) | |
} | |
.runDrain | |
.provideCustomLayer(consumerLayer) | |
} | |
//Somewhere | |
ZIO.service[StreamDiagnostics] flatMap run | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment