Skip to content

Instantly share code, notes, and snippets.

@EtaCassiopeia
Created September 16, 2021 14:47
Show Gist options
  • Save EtaCassiopeia/bbba4cedf1b0060a08c1c3a53f332555 to your computer and use it in GitHub Desktop.
Save EtaCassiopeia/bbba4cedf1b0060a08c1c3a53f332555 to your computer and use it in GitHub Desktop.
object Graduation extends DefaultRunnableSpec {
def spec =
suite("Graduation") {
test("bulkhead") {
type Task = Has[Console] with Has[Live]
val task = Live.live(ZIO.sleep(100.millis)) *> Console.print(".")
final case class Bulkhead(ref: TRef[Int], maxInProcess: Int) {
def acquire: UIO[Unit] = (ref.get.retryUntil(_ < maxInProcess) *> ref.update(_ + 1)).commit
def release: UIO[Unit] = ref.update(_ - 1).commit
}
for {
ref <- TRef.make(0).commit
bulkhead = Bulkhead(ref, 10)
counter <- Ref.make[Int](0)
taskQueue <- Queue.dropping[RIO[Task, Unit]](100)
_ <- ZIO.foreachParDiscard(1 to 100)(_ => taskQueue.offer(task))
_ <- ZIO.foreachParDiscard(1 to 100) { _ =>
(bulkhead.acquire *> taskQueue.take.flatten *> counter.update(_ + 1)).ensuring(bulkhead.release)
}
count <- counter.get
} yield assertTrue(count == 100)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment