Skip to content

Instantly share code, notes, and snippets.

@ShahOdin
Created December 11, 2019 16:33
Show Gist options
  • Save ShahOdin/e65c41c90f4be89b694b2acb540d9f7c to your computer and use it in GitHub Desktop.
Save ShahOdin/e65c41c90f4be89b694b2acb540d9f7c to your computer and use it in GitHub Desktop.
cats concurrency and paralallisation demo
package com.itv.sif.db
import cats.effect.IOApp
import java.util.concurrent.Executors
import fs2.Stream
import cats.syntax.functor._
import scala.concurrent.ExecutionContext
import cats.effect._
object Toolbox {
def blockingThreadPool[F[_]](implicit F: Sync[F]): Resource[F, ExecutionContext] =
Resource(F.delay {
val executor = Executors.newCachedThreadPool()
val ec = ExecutionContext.fromExecutor(executor)
println("getting the resource ")
(ec, F.delay(executor.shutdown()))
})
def blockingWork[F[_]](implicit F: Sync[F]): F[String] = F.delay {
println("Enter your name: ")
scala.io.StdIn.readLine()
}
def allocateResourcesAndDoWork[F[_]: Sync: Lambda[a[_] => Bracket[a, Throwable]] ](r: Resource[F, ExecutionContext])(
implicit cs: ContextShift[F]
): F[String] = r.use { ec =>
cs.evalOn(ec)(blockingWork[F])
}
def useResourcesAndDoWork[F[_]: Sync](ec: ExecutionContext)(implicit cs: ContextShift[F]) = cs.evalOn(ec)(blockingWork[F])
}
object TestDemo extends IOApp {
def resourceRecipe: Resource[IO, ExecutionContext] = Toolbox.blockingThreadPool[IO]
def eachProcessHasItsOwnResource: IO[ExitCode] = for {
n <- Toolbox.allocateResourcesAndDoWork(resourceRecipe)
m <- Toolbox.allocateResourcesAndDoWork(resourceRecipe)
_ <- IO(println(s"Hello, $n and $m!"))
} yield ExitCode.Success
def processesSharedResource: IO[ExitCode] = {
def stream: Stream[IO, ExitCode] = for {
resource <- Stream.resource(resourceRecipe)
n <- Stream.eval(Toolbox.useResourcesAndDoWork[IO](resource))
m <- Stream.eval(Toolbox.useResourcesAndDoWork[IO](resource))
_ <- Stream.eval(IO(println(s"Hello, $n and $m!")))
} yield ExitCode.Success
stream.compile.drain.as(ExitCode.Success)
}
def run(args: List[String]): IO[ExitCode] = processesSharedResource
}
object demo extends IOApp {
import cats.effect.{Fiber, IO}
import cats.implicits._
import scala.concurrent.ExecutionContext.Implicits.global
// Needed for `start`
implicit val ctx = IO.contextShift(global)
val io = IO(println("Hello!"))
val fiber: IO[Fiber[IO, Unit]] = io.start
val launchMissiles = IO.raiseError(new Exception("boom!"))
val runToBunker = IO(println("To the bunker!!!"))// *> IO.raiseError(new Exception("we failed!"))
override def run(args: List[String]): IO[ExitCode] = for {
fiber <- launchMissiles.start
_ <- runToBunker.handleErrorWith { error =>
// Retreat failed, cancel launch (maybe we should
// have retreated to our bunker before the launch?)
IO.pure(println("quick quick cancel the launch")) *> fiber.cancel *> IO.raiseError(error)
}
nothing <- fiber.join
} yield nothing
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment