Skip to content

Instantly share code, notes, and snippets.

@BalmungSan
Last active August 23, 2021 20:48
Show Gist options
  • Save BalmungSan/7b111004a3bb3817799936d856524d4a to your computer and use it in GitHub Desktop.
Save BalmungSan/7b111004a3bb3817799936d856524d4a to your computer and use it in GitHub Desktop.
MultiTaskRunner - A CE3 program that runs multiple tasks in parallel and allow them to report their progress
name := "multi-task-runner"
version := "1.0.0"
scalaVersion := "2.13.6"
libraryDependencies += "org.typelevel" %% "cats-effect" % "3.2.3"
run / fork := true
package example
import cats.effect.IO
import cats.effect.kernel.Ref
import cats.syntax.all._
import scala.concurrent.duration._
object MultiTaskRunner {
/** Allows a task to report its progress. */
sealed trait TaskProgressReporter {
def reportProgress(newProgress: Int): IO[Unit]
}
object TaskProgressReporter {
private[MultiTaskRunner] def fromRef(ref: Ref[IO, Int]): TaskProgressReporter =
new TaskProgressReporter {
override def reportProgress(newProgress: Int): IO[Unit] =
ref.update(_ => newProgress)
}
}
/** A task factory is a tuple of the task name and
* a function from its TaskProgressReporter to an IO representing the task work to do.
*/
type TaskFactory = (String, TaskProgressReporter => IO[Unit])
/** Runs the provided list of tasks in parallel. */
def runTasksInParallel(tasksToRun: List[TaskFactory]): IO[Unit] =
tasksToRun.parTraverse_ {
case (taskName, taskFactory) =>
Ref[IO].of(0).flatMap { progressRef =>
val progressReporter = {
val reportProgress = progressRef.get.flatMap { taskProgress =>
IO.println(s"Task (${taskName}): ${taskProgress}%")
}
(IO.sleep(1.second) *> reportProgress).foreverM
}
progressReporter.background.use { _ =>
taskFactory(TaskProgressReporter.fromRef(progressRef))
} *> IO.println(s"Task (${taskName}) finished!")
}
}
}
package example
import cats.effect.{IO, IOApp}
import cats.syntax.all._
import scala.concurrent.duration._
object Program extends IOApp.Simple {
override final val run: IO[Unit] =
MultiTaskRunner.runTasksInParallel(List(
"Task one" -> { reporter =>
List.range(start = 1, end = 11).traverse_ { i =>
IO.sleep(2.seconds) *> reporter.reportProgress(newProgress = i * 10)
}
},
"Task two" -> { reporter =>
List.range(start = 1, end = 4).traverse_ { i =>
IO.sleep(3.seconds) *> reporter.reportProgress(newProgress = i * 33)
}
},
"Task three" -> { reporter =>
List.range(start = 1, end = 21).traverse_ { i =>
IO.sleep(500.milliseconds) *> reporter.reportProgress(newProgress = i * 5)
}
}
))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment