Skip to content

Instantly share code, notes, and snippets.

@jsfwa
Created November 15, 2020 13:43
Show Gist options
  • Save jsfwa/b8838f94716d8ef2c8b5cae3683b772e to your computer and use it in GitHub Desktop.
Save jsfwa/b8838f94716d8ef2c8b5cae3683b772e to your computer and use it in GitHub Desktop.
import java.util.concurrent.atomic.AtomicLong
import zio._
import zio.clock.Clock
import zio.duration._
import zio.stream.ZStream
object Problem extends App {
/* override val platform: Platform =
Platform.default.withTracing(Tracing.disabled)*/
//When to restart substreams
val interruptRepeatCheckpoint = 1.hour
val now = System.currentTimeMillis()
def run(args: List[String]): zio.URIO[zio.ZEnv, ExitCode] = {
def newJob(queue: Queue[Promise[Option[Throwable], Chunk[Int]]]) = {
val task: ZIO[Any, Option[Throwable], Chunk[Int]] = for {
p <- Promise.make[Option[Throwable], Chunk[Int]]
_ <- queue.offer(p)
r <- p.await
} yield r
ZStream {
ZManaged.succeed {
task
}
}
}
(for {
jobQueue <- Queue
.unbounded[(String, ZStream[Any with Clock, Throwable, Int])]
feedQueue <- Queue.unbounded[Promise[Option[Throwable], Chunk[Int]]]
//Actual stream of substreams
stream = ZStream
.fromQueue(jobQueue)
.mapMParUnordered(50) {
case (i, s) =>
//Just to affect performance faster
val stupidCounter = new AtomicLong(0L)
val st = s
.tap(_ => ZIO.succeed(stupidCounter.incrementAndGet()))
.bufferSliding(1)
.interruptAfter(interruptRepeatCheckpoint)
.runDrain
.repeat(Schedule.forever)
for {
_ <- Task {
println(s"[Substream-${i}]: ${stupidCounter.get / ((System
.currentTimeMillis() - now) / 1000)}rps")
}.repeat(Schedule.spaced(1.minute)).fork
r <- st
} yield r
}
.runDrain
//Feed to fullfill promises
feed = ZStream
.fromQueue(feedQueue)
.mapMPar(10) { x =>
x.succeed(Chunk(1))
}
.runDrain
fb1 <- stream.fork
fb2 <- feed.fork
_ <- Chunk
.fromIterable(0.to(10))
.mapM(i => jobQueue.offer(i.toString -> newJob(feedQueue)))
_ <- fb1.zip(fb2).join
} yield ()).exitCode
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment