Created
December 11, 2019 16:33
-
-
Save ShahOdin/e65c41c90f4be89b694b2acb540d9f7c to your computer and use it in GitHub Desktop.
cats concurrency and paralallisation demo
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.itv.sif.db | |
import cats.effect.IOApp | |
import java.util.concurrent.Executors | |
import fs2.Stream | |
import cats.syntax.functor._ | |
import scala.concurrent.ExecutionContext | |
import cats.effect._ | |
object Toolbox { | |
def blockingThreadPool[F[_]](implicit F: Sync[F]): Resource[F, ExecutionContext] = | |
Resource(F.delay { | |
val executor = Executors.newCachedThreadPool() | |
val ec = ExecutionContext.fromExecutor(executor) | |
println("getting the resource ") | |
(ec, F.delay(executor.shutdown())) | |
}) | |
def blockingWork[F[_]](implicit F: Sync[F]): F[String] = F.delay { | |
println("Enter your name: ") | |
scala.io.StdIn.readLine() | |
} | |
def allocateResourcesAndDoWork[F[_]: Sync: Lambda[a[_] => Bracket[a, Throwable]] ](r: Resource[F, ExecutionContext])( | |
implicit cs: ContextShift[F] | |
): F[String] = r.use { ec => | |
cs.evalOn(ec)(blockingWork[F]) | |
} | |
def useResourcesAndDoWork[F[_]: Sync](ec: ExecutionContext)(implicit cs: ContextShift[F]) = cs.evalOn(ec)(blockingWork[F]) | |
} | |
object TestDemo extends IOApp { | |
def resourceRecipe: Resource[IO, ExecutionContext] = Toolbox.blockingThreadPool[IO] | |
def eachProcessHasItsOwnResource: IO[ExitCode] = for { | |
n <- Toolbox.allocateResourcesAndDoWork(resourceRecipe) | |
m <- Toolbox.allocateResourcesAndDoWork(resourceRecipe) | |
_ <- IO(println(s"Hello, $n and $m!")) | |
} yield ExitCode.Success | |
def processesSharedResource: IO[ExitCode] = { | |
def stream: Stream[IO, ExitCode] = for { | |
resource <- Stream.resource(resourceRecipe) | |
n <- Stream.eval(Toolbox.useResourcesAndDoWork[IO](resource)) | |
m <- Stream.eval(Toolbox.useResourcesAndDoWork[IO](resource)) | |
_ <- Stream.eval(IO(println(s"Hello, $n and $m!"))) | |
} yield ExitCode.Success | |
stream.compile.drain.as(ExitCode.Success) | |
} | |
def run(args: List[String]): IO[ExitCode] = processesSharedResource | |
} | |
object demo extends IOApp { | |
import cats.effect.{Fiber, IO} | |
import cats.implicits._ | |
import scala.concurrent.ExecutionContext.Implicits.global | |
// Needed for `start` | |
implicit val ctx = IO.contextShift(global) | |
val io = IO(println("Hello!")) | |
val fiber: IO[Fiber[IO, Unit]] = io.start | |
val launchMissiles = IO.raiseError(new Exception("boom!")) | |
val runToBunker = IO(println("To the bunker!!!"))// *> IO.raiseError(new Exception("we failed!")) | |
override def run(args: List[String]): IO[ExitCode] = for { | |
fiber <- launchMissiles.start | |
_ <- runToBunker.handleErrorWith { error => | |
// Retreat failed, cancel launch (maybe we should | |
// have retreated to our bunker before the launch?) | |
IO.pure(println("quick quick cancel the launch")) *> fiber.cancel *> IO.raiseError(error) | |
} | |
nothing <- fiber.join | |
} yield nothing | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment