Created
October 6, 2025 09:21
-
-
Save khajavi/da614617b1ffe54530dd45d68c7ccce6 to your computer and use it in GitHub Desktop.
Load Balancer with Graceful Shutdown
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import zio._ | |
| trait LoadBalancer[A] { | |
| def submit(work: A): Task[Unit] | |
| def shutdown: Task[Unit] | |
| def shutdownGracefully(timeout: Duration): Task[Unit] | |
| } | |
| object LoadBalancer { | |
| def make[A]( | |
| workerCount: Int, | |
| process: A => Task[A] | |
| ): Task[LoadBalancer[A]] = { | |
| for { | |
| // Create a queue for each worker | |
| workerQueues <- | |
| ZIO.foreach((0 until workerCount).toList)(_ => Queue.unbounded[A]) | |
| roundRobinCounter <- Ref.make(0) | |
| isShutdown <- Ref.make(false) | |
| // Track completion signal for each worker | |
| workerCompletions <- ZIO.foreach((0 until workerCount).toList)(_ => | |
| Promise.make[Nothing, Unit] | |
| ) | |
| // Start worker fibers with completion tracking | |
| workerFibers <- | |
| ZIO.foreach(workerQueues.zip(workerCompletions).zipWithIndex) { | |
| case ((queue, completion), idx) => | |
| def workerLoop: UIO[Nothing] = | |
| queue.take.flatMap { work => | |
| process(work) | |
| .tapError(err => | |
| ZIO.debug( | |
| s"Worker $idx failed to process work: ${err.getMessage}" | |
| ) | |
| ) | |
| .ignore *> workerLoop | |
| } | |
| workerLoop.onExit { | |
| case Exit.Success(_) => completion.succeed(()) | |
| case Exit.Failure(cause) => completion.failCause(cause) | |
| }.forkDaemon | |
| } | |
| // Track active work items per worker | |
| activeWorkCounts <- | |
| ZIO.foreach((0 until workerCount).toList)(_ => Ref.make(0)) | |
| } yield new LoadBalancer[A] { | |
| override def submit(work: A): Task[Unit] = | |
| for { | |
| shutdown <- isShutdown.get | |
| _ <- ZIO.when(shutdown)( | |
| ZIO.fail( | |
| new IllegalStateException( | |
| "LoadBalancer has been shutdown" | |
| ) | |
| ) | |
| ) | |
| // Get next queue index using round-robin | |
| queueIndex <- | |
| roundRobinCounter.getAndUpdate(i => (i + 1) % workerCount) | |
| // Track work item | |
| _ <- activeWorkCounts(queueIndex).update(_ + 1) | |
| // Submit work to the selected queue | |
| _ <- workerQueues(queueIndex).offer(work).onExit { _ => | |
| activeWorkCounts(queueIndex).update(_ - 1) | |
| } | |
| } yield () | |
| override def shutdown: Task[Unit] = { | |
| for { | |
| alreadyShutdown <- isShutdown.getAndSet(true) | |
| _ <- ZIO.unless(alreadyShutdown) { | |
| for { | |
| // Shutdown all queues immediately | |
| _ <- ZIO.foreach(workerQueues)(_.shutdown) | |
| // Interrupt all worker fibers immediately | |
| _ <- ZIO.foreach(workerFibers)(_.interrupt) | |
| _ <- | |
| ZIO.debug(s"LoadBalancer shutdown complete (forced)") | |
| } yield () | |
| } | |
| } yield () | |
| }.uninterruptible | |
| override def shutdownGracefully(timeout: Duration): Task[Unit] = | |
| for { | |
| alreadyShutdown <- isShutdown.getAndSet(true) | |
| _ <- | |
| ZIO.unless(alreadyShutdown) { | |
| ZIO.uninterruptibleMask { restore => | |
| for { | |
| _ <- | |
| ZIO.debug( | |
| s"Starting graceful shutdown with timeout: $timeout" | |
| ) | |
| // Step 1: Stop accepting new work (already done via isShutdown flag) | |
| // Step 2: Shutdown all queues (no new work can be added) | |
| _ <- ZIO.foreach(workerQueues)(_.shutdown) | |
| _ <- ZIO.debug( | |
| "Queues shutdown, waiting for workers to finish..." | |
| ) | |
| // Step 3: Wait for all workers to complete their queues (with timeout) | |
| gracefulShutdownResult <- | |
| restore( | |
| ZIO | |
| .foreach(workerCompletions)(_.await) | |
| .timeout(timeout) | |
| ) | |
| _ <- gracefulShutdownResult match { | |
| case Some(_) => | |
| for { | |
| _ <- ZIO.debug( | |
| "All workers finished processing gracefully" | |
| ) | |
| // Workers have completed, join them | |
| _ <- ZIO.foreach(workerFibers)(_.join) | |
| } yield () | |
| case None => | |
| for { | |
| _ <- | |
| ZIO.debug( | |
| s"Graceful shutdown timed out after $timeout" | |
| ) | |
| // Check queue sizes to log what was left | |
| remainingWork <- | |
| ZIO.foreach(workerQueues.zipWithIndex) { | |
| case (q, idx) => | |
| q.size.map(size => (idx, size)) | |
| } | |
| _ <- | |
| ZIO.foreach(remainingWork.filter(_._2 > 0)) { | |
| case (idx, size) => | |
| ZIO.debug( | |
| s"Worker $idx had $size items remaining" | |
| ) | |
| } | |
| // Force interrupt all workers | |
| _ <- ZIO.foreach(workerFibers)(_.interrupt) | |
| _ <- ZIO.debug( | |
| "Workers forcefully interrupted after timeout" | |
| ) | |
| } yield () | |
| } | |
| _ <- ZIO.debug("LoadBalancer shutdown complete") | |
| } yield () | |
| } | |
| } | |
| } yield () | |
| } | |
| } | |
| // Enhanced version with metrics and drain monitoring | |
| def makeWithMetrics[A]( | |
| workerCount: Int, | |
| process: A => Task[A] | |
| ): Task[LoadBalancer[A]] = { | |
| for { | |
| workerQueues <- ZIO.foreach((0 until workerCount).toList) { _ => | |
| Queue.unbounded[A] | |
| } | |
| roundRobinCounter <- Ref.make(0) | |
| isShutdown <- Ref.make(false) | |
| // Metrics tracking | |
| totalSubmitted <- Ref.make(0L) | |
| totalProcessed <- Ref.make(0L) | |
| processingCounters <- ZIO.foreach((0 until workerCount).toList) { _ => | |
| Ref.make(0L) | |
| } | |
| // Graceful shutdown promise | |
| gracefulShutdownPromise <- Promise.make[Nothing, Unit] | |
| // Start worker fibers with processing tracking | |
| workerFibers <- | |
| ZIO.foreach(workerQueues.zip(processingCounters).zipWithIndex) { | |
| case ((queue, counter), idx) => | |
| def workerLoop: Task[Unit] = | |
| queue.take.flatMap { work => | |
| for { | |
| _ <- counter.update(_ + 1) | |
| _ <- | |
| process(work) | |
| .tapError(err => | |
| ZIO.debug(s"Worker $idx failed: ${err.getMessage}") | |
| ) | |
| .ignore | |
| _ <- totalProcessed.update(_ + 1) | |
| processed <- totalProcessed.get | |
| submitted <- totalSubmitted.get | |
| // Check if all work is done during graceful shutdown | |
| _ <- ZIO.whenZIO(isShutdown.get) { | |
| ZIO.when(processed >= submitted)( | |
| gracefulShutdownPromise.succeed(()).ignore | |
| ) | |
| } | |
| _ <- workerLoop | |
| } yield () | |
| } | |
| workerLoop.forkDaemon | |
| } | |
| } yield new LoadBalancer[A] { | |
| override def submit(work: A): Task[Unit] = | |
| for { | |
| shutdown <- isShutdown.get | |
| _ <- ZIO.when(shutdown)( | |
| ZIO.fail( | |
| new IllegalStateException( | |
| "LoadBalancer is shutting down" | |
| ) | |
| ) | |
| ) | |
| queueIndex <- | |
| roundRobinCounter.getAndUpdate(i => (i + 1) % workerCount) | |
| _ <- totalSubmitted.update(_ + 1) | |
| _ <- workerQueues(queueIndex).offer(work) | |
| } yield () | |
| override def shutdown: Task[Unit] = | |
| for { | |
| alreadyShutdown <- isShutdown.getAndSet(true) | |
| _ <- ZIO.unless(alreadyShutdown) { | |
| for { | |
| _ <- ZIO.foreach(workerQueues)(_.shutdown) | |
| _ <- ZIO.foreach(workerFibers)(_.interrupt) | |
| submitted <- totalSubmitted.get | |
| processed <- totalProcessed.get | |
| _ <- | |
| ZIO.debug( | |
| s"LoadBalancer force shutdown: processed $processed/$submitted items" | |
| ) | |
| } yield () | |
| } | |
| } yield () | |
| override def shutdownGracefully(timeout: Duration): Task[Unit] = | |
| for { | |
| alreadyShutdown <- isShutdown.getAndSet(true) | |
| _ <- ZIO.unless(alreadyShutdown) { | |
| for { | |
| submitted <- totalSubmitted.get | |
| processed <- totalProcessed.get | |
| remaining = submitted - processed | |
| _ <- | |
| ZIO.debug( | |
| s"Starting graceful shutdown: $remaining items remaining to process" | |
| ) | |
| // If no work pending, complete immediately | |
| _ <- ZIO.when(remaining <= 0)( | |
| gracefulShutdownPromise.succeed(()) | |
| ) | |
| // Shutdown queues to prevent new work | |
| _ <- ZIO.foreach(workerQueues)(_.shutdown) | |
| // Wait for all work to complete or timeout | |
| result <- gracefulShutdownPromise.await | |
| .timeout(timeout) | |
| .flatMap { | |
| case Some(_) => | |
| for { | |
| finalProcessed <- totalProcessed.get | |
| _ <- | |
| ZIO.debug( | |
| s"Graceful shutdown complete: processed all $finalProcessed items" | |
| ) | |
| _ <- ZIO.foreach(workerFibers)(_.join) | |
| } yield () | |
| case None => | |
| for { | |
| finalProcessed <- totalProcessed.get | |
| finalSubmitted <- totalSubmitted.get | |
| unprocessed = | |
| finalSubmitted - finalProcessed | |
| _ <- | |
| ZIO.debug( | |
| s"Graceful shutdown timeout: $unprocessed items unprocessed" | |
| ) | |
| // Log per-worker stats | |
| _ <- | |
| ZIO.foreach( | |
| processingCounters.zipWithIndex | |
| ) { case (counter, idx) => | |
| counter.get.flatMap(count => | |
| ZIO.debug( | |
| s"Worker $idx processed $count items" | |
| ) | |
| ) | |
| } | |
| _ <- ZIO.foreach(workerFibers)( | |
| _.interrupt | |
| ) | |
| } yield () | |
| } | |
| } yield () | |
| } | |
| } yield () | |
| } | |
| } | |
| } | |
| // Example usage with graceful shutdown | |
| object LoadBalancerGracefulExample extends ZIOAppDefault { | |
| def run = { | |
| val program = for { | |
| // Create load balancer with metrics | |
| balancer <- LoadBalancer.makeWithMetrics[String]( | |
| workerCount = 3, | |
| process = (work: String) => | |
| for { | |
| // fiber <- ZIO.getFiberRef(FiberId.id) | |
| _ <- ZIO.debug(s"Worker processing: $work") | |
| // Simulate variable processing time | |
| delay <- Random.nextIntBetween(1000, 5000) | |
| _ <- ZIO.sleep(delay.milliseconds) | |
| processed = s"Completed: $work" | |
| _ <- ZIO.debug(processed) | |
| } yield processed | |
| ) | |
| // Submit a batch of work | |
| _ <- ZIO | |
| .foreach(1 to 20) { i => | |
| balancer.submit(s"Task-$i").delay(100.milliseconds) | |
| } | |
| .fork // Submit asynchronously | |
| // Let some work get processed | |
| _ <- ZIO.sleep(500.milliseconds) | |
| // Try graceful shutdown with 3 second timeout | |
| _ <- ZIO.debug("Initiating graceful shutdown...") | |
| shutdownFiber <- balancer.shutdownGracefully(3.seconds).fork | |
| // Try to submit more work (should fail) | |
| _ <- balancer.submit("Late-Task").either.flatMap { | |
| case Left(e) => | |
| ZIO.debug( | |
| s"Expected: Cannot submit during shutdown - ${e.getMessage}" | |
| ) | |
| case Right(_) => | |
| ZIO.debug( | |
| "Unexpected: Submission succeeded during shutdown" | |
| ) | |
| } | |
| // Wait for shutdown to complete | |
| _ <- shutdownFiber.join | |
| _ <- ZIO.debug("Application complete") | |
| } yield () | |
| program.exitCode | |
| } | |
| } | |
| // Test comparing shutdown vs shutdownGracefully | |
| object LoadBalancerComparisonTest extends ZIOAppDefault { | |
| def testShutdownMethod(useGraceful: Boolean) = | |
| for { | |
| _ <- | |
| ZIO.debug( | |
| s"\n=== Testing ${if (useGraceful) "GRACEFUL" else "FORCED"} shutdown ===" | |
| ) | |
| processedCount <- Ref.make(0) | |
| balancer <- LoadBalancer.make[Int]( | |
| workerCount = 2, | |
| process = (work: Int) => | |
| for { | |
| _ <- ZIO.sleep(200.milliseconds) // Slow processing | |
| _ <- processedCount.update(_ + 1) | |
| _ <- ZIO.debug(s"Processed item $work") | |
| } yield work | |
| ) | |
| // Submit 10 items | |
| _ <- ZIO.foreach(1 to 10)(balancer.submit) | |
| // Brief delay | |
| _ <- ZIO.sleep(100.milliseconds) | |
| // Shutdown | |
| _ <- if (useGraceful) { | |
| balancer.shutdownGracefully(5.seconds) | |
| } else { | |
| balancer.shutdown | |
| } | |
| count <- processedCount.get | |
| _ <- ZIO.debug(s"Items processed: $count / 10") | |
| } yield count | |
| def run = | |
| for { | |
| forcedCount <- testShutdownMethod(useGraceful = false) | |
| _ <- ZIO.sleep(1.second) | |
| gracefulCount <- testShutdownMethod(useGraceful = true) | |
| _ <- ZIO.debug(s"\nResults:") | |
| _ <- ZIO.debug(s"Forced shutdown processed: $forcedCount items") | |
| _ <- ZIO.debug(s"Graceful shutdown processed: $gracefulCount items") | |
| } yield () | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment