Created
November 15, 2020 13:43
-
-
Save jsfwa/b8838f94716d8ef2c8b5cae3683b772e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import java.util.concurrent.atomic.AtomicLong | |
import zio._ | |
import zio.clock.Clock | |
import zio.duration._ | |
import zio.stream.ZStream | |
object Problem extends App { | |
/* override val platform: Platform = | |
Platform.default.withTracing(Tracing.disabled)*/ | |
//When to restart substreams | |
val interruptRepeatCheckpoint = 1.hour | |
val now = System.currentTimeMillis() | |
def run(args: List[String]): zio.URIO[zio.ZEnv, ExitCode] = { | |
def newJob(queue: Queue[Promise[Option[Throwable], Chunk[Int]]]) = { | |
val task: ZIO[Any, Option[Throwable], Chunk[Int]] = for { | |
p <- Promise.make[Option[Throwable], Chunk[Int]] | |
_ <- queue.offer(p) | |
r <- p.await | |
} yield r | |
ZStream { | |
ZManaged.succeed { | |
task | |
} | |
} | |
} | |
(for { | |
jobQueue <- Queue | |
.unbounded[(String, ZStream[Any with Clock, Throwable, Int])] | |
feedQueue <- Queue.unbounded[Promise[Option[Throwable], Chunk[Int]]] | |
//Actual stream of substreams | |
stream = ZStream | |
.fromQueue(jobQueue) | |
.mapMParUnordered(50) { | |
case (i, s) => | |
//Just to affect performance faster | |
val stupidCounter = new AtomicLong(0L) | |
val st = s | |
.tap(_ => ZIO.succeed(stupidCounter.incrementAndGet())) | |
.bufferSliding(1) | |
.interruptAfter(interruptRepeatCheckpoint) | |
.runDrain | |
.repeat(Schedule.forever) | |
for { | |
_ <- Task { | |
println(s"[Substream-${i}]: ${stupidCounter.get / ((System | |
.currentTimeMillis() - now) / 1000)}rps") | |
}.repeat(Schedule.spaced(1.minute)).fork | |
r <- st | |
} yield r | |
} | |
.runDrain | |
//Feed to fullfill promises | |
feed = ZStream | |
.fromQueue(feedQueue) | |
.mapMPar(10) { x => | |
x.succeed(Chunk(1)) | |
} | |
.runDrain | |
fb1 <- stream.fork | |
fb2 <- feed.fork | |
_ <- Chunk | |
.fromIterable(0.to(10)) | |
.mapM(i => jobQueue.offer(i.toString -> newJob(feedQueue))) | |
_ <- fb1.zip(fb2).join | |
} yield ()).exitCode | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment