Skip to content

Instantly share code, notes, and snippets.

@jsfwa
Created March 15, 2021 12:51
Show Gist options
  • Save jsfwa/dcb9b75056473396b2c307d00ad67aed to your computer and use it in GitHub Desktop.
Save jsfwa/dcb9b75056473396b2c307d00ad67aed to your computer and use it in GitHub Desktop.
//Any - because everything in zio is fucking private even diagnostic trait
class StreamDiagnostics(val queue: Queue[Any]) extends LogSupport {
val map = TrieMap.empty[String, Promise[Throwable, Unit]]
def runWithMonitor[R](tp: TopicPartition, stream: ZStream[R, Throwable, _]): ZIO[R, Throwable, Unit] =
for {
p <- Promise.make[Throwable, Unit]
_ <- queue.offer(StreamControl(tp, p))
fb <- stream
.interruptWhen(p)
.runDrain.tap(_ => p.succeed(())).tapCause(_ => p.succeed(()))
.fork
_ <- fb.join
} yield ()
def start() =
ZStream
.fromQueueWithShutdown(queue)
.mapM {
//IMPORTANT! Do not use Revoked and Assigned types provided by zio-kafka for termination of existings streams(its fucked up)
case e: StreamControl =>
val key = s"${e.tp.topic()}-${e.tp.partition()}"
map.remove(key) match {
case Some(v) =>
warn(
s"[Diagnostics:StreamControl][$key] Attempt to assign partition, that already assigned, revoking old one"
)
map.update(key, e.p)
v.succeed(())
case _ =>
info(s"[Diagnostics:StreamControl][$key] Successfully assigned")
map.update(key, e.p)
Task.unit
}
case _ => ZIO.unit
}
.runDrain
}
object StreamDiagnostics{
case class StreamControl(tp: TopicPartition, p: Promise[Throwable, Unit])
def live = (for {
diagnosticQueue <- Queue.bounded[Any](128)
streamDiagnostics <- StreamDiagnostics.make(diagnosticQueue)
_ <- streamDiagnostics.start().fork
} yield streamDiagnostics).toLayer
}
//Usage example
object Example{
val diagnosticsLayer = StreamDiagnostics.live
val consumerLayer = Consumer
.make(
consumerSettings,
diagnostics = Diagnostics.SlidingQueue(
streamDiagnostics.queue.asInstanceOf[Queue[DiagnosticEvent]]) //Reasoning? Check top comment^
)
.toLayer
def run(streamDiagnostics: StreamDiagnostics) = {
Consumer
.subscribeAnd(...)
.partitionedStream(Serde.long.asTry, Serde.byteArray)
.mapMParUnordered(Byte.MaxValue) {
case (tp, s) =>
streamDiagnostics
.runWithMonitor(tp, s)
}
.runDrain
.provideCustomLayer(consumerLayer)
}
//Somewhere
ZIO.service[StreamDiagnostics] flatMap run
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment