Last active
June 24, 2021 14:29
-
-
Save BalmungSan/c8d7b47686052983f8e9818dd0cb5f7c to your computer and use it in GitHub Desktop.
HighLowPriorityRunner - A CE3 program to schedule high & low priority jobs to be run on a dedicated EC
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
name := "low-high-priority-runner" | |
version := "1.0.0" | |
scalaVersion := "2.13.6" | |
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.1.1" | |
run / fork := true |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import cats.effect.Async | |
import cats.effect.std.Queue | |
import cats.effect.syntax.all._ | |
import cats.syntax.all._ | |
import scala.concurrent.ExecutionContext | |
object HighLowPriorityRunner { | |
final case class Config[F[_]]( | |
highPriorityJobs: Queue[F, F[Unit]], | |
lowPriorityJobs: Queue[F, F[Unit]], | |
customEC: Option[ExecutionContext] | |
) | |
def apply[F[_]](config: Config[F]) | |
(implicit F: Async[F]): F[Unit] = { | |
val processOneJob = | |
config.highPriorityJobs.tryTake.flatMap { | |
case Some(hpJob) => hpJob | |
case None => config.lowPriorityJobs.tryTake.flatMap { | |
case Some(lpJob) => lpJob | |
case None => F.unit | |
} | |
} | |
val loop = processOneJob.start.foreverM.void | |
config.customEC.fold(ifEmpty = loop)(ec => loop.evalOn(ec)) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package example | |
import cats.effect.{Async, IO, IOApp, Resource} | |
import cats.effect.std.Queue | |
import cats.effect.syntax.all._ | |
import cats.syntax.all._ | |
import java.util.concurrent.Executors | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.duration._ | |
object Main extends IOApp.Simple { | |
override final val run: IO[Unit] = | |
Resource.make(IO(Executors.newFixedThreadPool(2)))(ec => IO.blocking(ec.shutdown())).use { ec => | |
Program[IO](ExecutionContext.fromExecutor(ec)) | |
} | |
} | |
object Program { | |
private def createJob[F[_]](id: Int)(implicit F: Async[F]): F[Unit] = | |
F.delay(println(s"Starting job ${id} on thread ${Thread.currentThread.getName}")) *> // Use Console over println on real code. | |
F.delay(Thread.sleep(1.second.toMillis)) *> // Blocks the Fiber! - Only for testing, use F.sleep on real code. | |
F.delay(println(s"Finished job ${id}!")) | |
def apply[F[_]](customEC: ExecutionContext)(implicit F: Async[F]): F[Unit] = for { | |
highPriorityJobs <- Queue.unbounded[F, F[Unit]] | |
lowPriorityJobs <- Queue.unbounded[F, F[Unit]] | |
runnerFiber <- HighLowPriorityRunner(HighLowPriorityRunner.Config( | |
highPriorityJobs, | |
lowPriorityJobs, | |
Some(customEC) | |
)).start | |
_ <- List.range(0, 10).traverse_(id => highPriorityJobs.offer(createJob(id))) | |
_ <- List.range(10, 15).traverse_(id => lowPriorityJobs.offer(createJob(id))) | |
_ <- F.sleep(5.seconds) | |
_ <- List.range(15, 20).traverse_(id => highPriorityJobs.offer(createJob(id))) | |
_ <- runnerFiber.join.void | |
} yield () | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment