Skip to content

Instantly share code, notes, and snippets.

@khajavi
Created October 6, 2025 09:21
Show Gist options
  • Save khajavi/da614617b1ffe54530dd45d68c7ccce6 to your computer and use it in GitHub Desktop.
Save khajavi/da614617b1ffe54530dd45d68c7ccce6 to your computer and use it in GitHub Desktop.
Load Balancer with Graceful Shutdown
import zio._
trait LoadBalancer[A] {
def submit(work: A): Task[Unit]
def shutdown: Task[Unit]
def shutdownGracefully(timeout: Duration): Task[Unit]
}
object LoadBalancer {
def make[A](
workerCount: Int,
process: A => Task[A]
): Task[LoadBalancer[A]] = {
for {
// Create a queue for each worker
workerQueues <-
ZIO.foreach((0 until workerCount).toList)(_ => Queue.unbounded[A])
roundRobinCounter <- Ref.make(0)
isShutdown <- Ref.make(false)
// Track completion signal for each worker
workerCompletions <- ZIO.foreach((0 until workerCount).toList)(_ =>
Promise.make[Nothing, Unit]
)
// Start worker fibers with completion tracking
workerFibers <-
ZIO.foreach(workerQueues.zip(workerCompletions).zipWithIndex) {
case ((queue, completion), idx) =>
def workerLoop: UIO[Nothing] =
queue.take.flatMap { work =>
process(work)
.tapError(err =>
ZIO.debug(
s"Worker $idx failed to process work: ${err.getMessage}"
)
)
.ignore *> workerLoop
}
workerLoop.onExit {
case Exit.Success(_) => completion.succeed(())
case Exit.Failure(cause) => completion.failCause(cause)
}.forkDaemon
}
// Track active work items per worker
activeWorkCounts <-
ZIO.foreach((0 until workerCount).toList)(_ => Ref.make(0))
} yield new LoadBalancer[A] {
override def submit(work: A): Task[Unit] =
for {
shutdown <- isShutdown.get
_ <- ZIO.when(shutdown)(
ZIO.fail(
new IllegalStateException(
"LoadBalancer has been shutdown"
)
)
)
// Get next queue index using round-robin
queueIndex <-
roundRobinCounter.getAndUpdate(i => (i + 1) % workerCount)
// Track work item
_ <- activeWorkCounts(queueIndex).update(_ + 1)
// Submit work to the selected queue
_ <- workerQueues(queueIndex).offer(work).onExit { _ =>
activeWorkCounts(queueIndex).update(_ - 1)
}
} yield ()
override def shutdown: Task[Unit] = {
for {
alreadyShutdown <- isShutdown.getAndSet(true)
_ <- ZIO.unless(alreadyShutdown) {
for {
// Shutdown all queues immediately
_ <- ZIO.foreach(workerQueues)(_.shutdown)
// Interrupt all worker fibers immediately
_ <- ZIO.foreach(workerFibers)(_.interrupt)
_ <-
ZIO.debug(s"LoadBalancer shutdown complete (forced)")
} yield ()
}
} yield ()
}.uninterruptible
override def shutdownGracefully(timeout: Duration): Task[Unit] =
for {
alreadyShutdown <- isShutdown.getAndSet(true)
_ <-
ZIO.unless(alreadyShutdown) {
ZIO.uninterruptibleMask { restore =>
for {
_ <-
ZIO.debug(
s"Starting graceful shutdown with timeout: $timeout"
)
// Step 1: Stop accepting new work (already done via isShutdown flag)
// Step 2: Shutdown all queues (no new work can be added)
_ <- ZIO.foreach(workerQueues)(_.shutdown)
_ <- ZIO.debug(
"Queues shutdown, waiting for workers to finish..."
)
// Step 3: Wait for all workers to complete their queues (with timeout)
gracefulShutdownResult <-
restore(
ZIO
.foreach(workerCompletions)(_.await)
.timeout(timeout)
)
_ <- gracefulShutdownResult match {
case Some(_) =>
for {
_ <- ZIO.debug(
"All workers finished processing gracefully"
)
// Workers have completed, join them
_ <- ZIO.foreach(workerFibers)(_.join)
} yield ()
case None =>
for {
_ <-
ZIO.debug(
s"Graceful shutdown timed out after $timeout"
)
// Check queue sizes to log what was left
remainingWork <-
ZIO.foreach(workerQueues.zipWithIndex) {
case (q, idx) =>
q.size.map(size => (idx, size))
}
_ <-
ZIO.foreach(remainingWork.filter(_._2 > 0)) {
case (idx, size) =>
ZIO.debug(
s"Worker $idx had $size items remaining"
)
}
// Force interrupt all workers
_ <- ZIO.foreach(workerFibers)(_.interrupt)
_ <- ZIO.debug(
"Workers forcefully interrupted after timeout"
)
} yield ()
}
_ <- ZIO.debug("LoadBalancer shutdown complete")
} yield ()
}
}
} yield ()
}
}
// Enhanced version with metrics and drain monitoring
def makeWithMetrics[A](
workerCount: Int,
process: A => Task[A]
): Task[LoadBalancer[A]] = {
for {
workerQueues <- ZIO.foreach((0 until workerCount).toList) { _ =>
Queue.unbounded[A]
}
roundRobinCounter <- Ref.make(0)
isShutdown <- Ref.make(false)
// Metrics tracking
totalSubmitted <- Ref.make(0L)
totalProcessed <- Ref.make(0L)
processingCounters <- ZIO.foreach((0 until workerCount).toList) { _ =>
Ref.make(0L)
}
// Graceful shutdown promise
gracefulShutdownPromise <- Promise.make[Nothing, Unit]
// Start worker fibers with processing tracking
workerFibers <-
ZIO.foreach(workerQueues.zip(processingCounters).zipWithIndex) {
case ((queue, counter), idx) =>
def workerLoop: Task[Unit] =
queue.take.flatMap { work =>
for {
_ <- counter.update(_ + 1)
_ <-
process(work)
.tapError(err =>
ZIO.debug(s"Worker $idx failed: ${err.getMessage}")
)
.ignore
_ <- totalProcessed.update(_ + 1)
processed <- totalProcessed.get
submitted <- totalSubmitted.get
// Check if all work is done during graceful shutdown
_ <- ZIO.whenZIO(isShutdown.get) {
ZIO.when(processed >= submitted)(
gracefulShutdownPromise.succeed(()).ignore
)
}
_ <- workerLoop
} yield ()
}
workerLoop.forkDaemon
}
} yield new LoadBalancer[A] {
override def submit(work: A): Task[Unit] =
for {
shutdown <- isShutdown.get
_ <- ZIO.when(shutdown)(
ZIO.fail(
new IllegalStateException(
"LoadBalancer is shutting down"
)
)
)
queueIndex <-
roundRobinCounter.getAndUpdate(i => (i + 1) % workerCount)
_ <- totalSubmitted.update(_ + 1)
_ <- workerQueues(queueIndex).offer(work)
} yield ()
override def shutdown: Task[Unit] =
for {
alreadyShutdown <- isShutdown.getAndSet(true)
_ <- ZIO.unless(alreadyShutdown) {
for {
_ <- ZIO.foreach(workerQueues)(_.shutdown)
_ <- ZIO.foreach(workerFibers)(_.interrupt)
submitted <- totalSubmitted.get
processed <- totalProcessed.get
_ <-
ZIO.debug(
s"LoadBalancer force shutdown: processed $processed/$submitted items"
)
} yield ()
}
} yield ()
override def shutdownGracefully(timeout: Duration): Task[Unit] =
for {
alreadyShutdown <- isShutdown.getAndSet(true)
_ <- ZIO.unless(alreadyShutdown) {
for {
submitted <- totalSubmitted.get
processed <- totalProcessed.get
remaining = submitted - processed
_ <-
ZIO.debug(
s"Starting graceful shutdown: $remaining items remaining to process"
)
// If no work pending, complete immediately
_ <- ZIO.when(remaining <= 0)(
gracefulShutdownPromise.succeed(())
)
// Shutdown queues to prevent new work
_ <- ZIO.foreach(workerQueues)(_.shutdown)
// Wait for all work to complete or timeout
result <- gracefulShutdownPromise.await
.timeout(timeout)
.flatMap {
case Some(_) =>
for {
finalProcessed <- totalProcessed.get
_ <-
ZIO.debug(
s"Graceful shutdown complete: processed all $finalProcessed items"
)
_ <- ZIO.foreach(workerFibers)(_.join)
} yield ()
case None =>
for {
finalProcessed <- totalProcessed.get
finalSubmitted <- totalSubmitted.get
unprocessed =
finalSubmitted - finalProcessed
_ <-
ZIO.debug(
s"Graceful shutdown timeout: $unprocessed items unprocessed"
)
// Log per-worker stats
_ <-
ZIO.foreach(
processingCounters.zipWithIndex
) { case (counter, idx) =>
counter.get.flatMap(count =>
ZIO.debug(
s"Worker $idx processed $count items"
)
)
}
_ <- ZIO.foreach(workerFibers)(
_.interrupt
)
} yield ()
}
} yield ()
}
} yield ()
}
}
}
// Example usage with graceful shutdown
object LoadBalancerGracefulExample extends ZIOAppDefault {
def run = {
val program = for {
// Create load balancer with metrics
balancer <- LoadBalancer.makeWithMetrics[String](
workerCount = 3,
process = (work: String) =>
for {
// fiber <- ZIO.getFiberRef(FiberId.id)
_ <- ZIO.debug(s"Worker processing: $work")
// Simulate variable processing time
delay <- Random.nextIntBetween(1000, 5000)
_ <- ZIO.sleep(delay.milliseconds)
processed = s"Completed: $work"
_ <- ZIO.debug(processed)
} yield processed
)
// Submit a batch of work
_ <- ZIO
.foreach(1 to 20) { i =>
balancer.submit(s"Task-$i").delay(100.milliseconds)
}
.fork // Submit asynchronously
// Let some work get processed
_ <- ZIO.sleep(500.milliseconds)
// Try graceful shutdown with 3 second timeout
_ <- ZIO.debug("Initiating graceful shutdown...")
shutdownFiber <- balancer.shutdownGracefully(3.seconds).fork
// Try to submit more work (should fail)
_ <- balancer.submit("Late-Task").either.flatMap {
case Left(e) =>
ZIO.debug(
s"Expected: Cannot submit during shutdown - ${e.getMessage}"
)
case Right(_) =>
ZIO.debug(
"Unexpected: Submission succeeded during shutdown"
)
}
// Wait for shutdown to complete
_ <- shutdownFiber.join
_ <- ZIO.debug("Application complete")
} yield ()
program.exitCode
}
}
// Test comparing shutdown vs shutdownGracefully
object LoadBalancerComparisonTest extends ZIOAppDefault {
def testShutdownMethod(useGraceful: Boolean) =
for {
_ <-
ZIO.debug(
s"\n=== Testing ${if (useGraceful) "GRACEFUL" else "FORCED"} shutdown ==="
)
processedCount <- Ref.make(0)
balancer <- LoadBalancer.make[Int](
workerCount = 2,
process = (work: Int) =>
for {
_ <- ZIO.sleep(200.milliseconds) // Slow processing
_ <- processedCount.update(_ + 1)
_ <- ZIO.debug(s"Processed item $work")
} yield work
)
// Submit 10 items
_ <- ZIO.foreach(1 to 10)(balancer.submit)
// Brief delay
_ <- ZIO.sleep(100.milliseconds)
// Shutdown
_ <- if (useGraceful) {
balancer.shutdownGracefully(5.seconds)
} else {
balancer.shutdown
}
count <- processedCount.get
_ <- ZIO.debug(s"Items processed: $count / 10")
} yield count
def run =
for {
forcedCount <- testShutdownMethod(useGraceful = false)
_ <- ZIO.sleep(1.second)
gracefulCount <- testShutdownMethod(useGraceful = true)
_ <- ZIO.debug(s"\nResults:")
_ <- ZIO.debug(s"Forced shutdown processed: $forcedCount items")
_ <- ZIO.debug(s"Graceful shutdown processed: $gracefulCount items")
} yield ()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment