Created
March 15, 2021 12:51
-
-
Save jsfwa/dcb9b75056473396b2c307d00ad67aed to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
//Any - because everything in zio is fucking private even diagnostic trait | |
class StreamDiagnostics(val queue: Queue[Any]) extends LogSupport { | |
val map = TrieMap.empty[String, Promise[Throwable, Unit]] | |
def runWithMonitor[R](tp: TopicPartition, stream: ZStream[R, Throwable, _]): ZIO[R, Throwable, Unit] = | |
for { | |
p <- Promise.make[Throwable, Unit] | |
_ <- queue.offer(StreamControl(tp, p)) | |
fb <- stream | |
.interruptWhen(p) | |
.runDrain.tap(_ => p.succeed(())).tapCause(_ => p.succeed(())) | |
.fork | |
_ <- fb.join | |
} yield () | |
def start() = | |
ZStream | |
.fromQueueWithShutdown(queue) | |
.mapM { | |
//IMPORTANT! Do not use Revoked and Assigned types provided by zio-kafka for termination of existings streams(its fucked up) | |
case e: StreamControl => | |
val key = s"${e.tp.topic()}-${e.tp.partition()}" | |
map.remove(key) match { | |
case Some(v) => | |
warn( | |
s"[Diagnostics:StreamControl][$key] Attempt to assign partition, that already assigned, revoking old one" | |
) | |
map.update(key, e.p) | |
v.succeed(()) | |
case _ => | |
info(s"[Diagnostics:StreamControl][$key] Successfully assigned") | |
map.update(key, e.p) | |
Task.unit | |
} | |
case _ => ZIO.unit | |
} | |
.runDrain | |
} | |
object StreamDiagnostics{ | |
case class StreamControl(tp: TopicPartition, p: Promise[Throwable, Unit]) | |
def live = (for { | |
diagnosticQueue <- Queue.bounded[Any](128) | |
streamDiagnostics <- StreamDiagnostics.make(diagnosticQueue) | |
_ <- streamDiagnostics.start().fork | |
} yield streamDiagnostics).toLayer | |
} | |
//Usage example | |
object Example{ | |
val diagnosticsLayer = StreamDiagnostics.live | |
val consumerLayer = Consumer | |
.make( | |
consumerSettings, | |
diagnostics = Diagnostics.SlidingQueue( | |
streamDiagnostics.queue.asInstanceOf[Queue[DiagnosticEvent]]) //Reasoning? Check top comment^ | |
) | |
.toLayer | |
def run(streamDiagnostics: StreamDiagnostics) = { | |
Consumer | |
.subscribeAnd(...) | |
.partitionedStream(Serde.long.asTry, Serde.byteArray) | |
.mapMParUnordered(Byte.MaxValue) { | |
case (tp, s) => | |
streamDiagnostics | |
.runWithMonitor(tp, s) | |
} | |
.runDrain | |
.provideCustomLayer(consumerLayer) | |
} | |
//Somewhere | |
ZIO.service[StreamDiagnostics] flatMap run | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment