Skip to content

Instantly share code, notes, and snippets.

@yasuabe
Created February 1, 2019 12:49
Show Gist options
  • Save yasuabe/8fe34189b59db77eab74184669b2e130 to your computer and use it in GitHub Desktop.
Save yasuabe/8fe34189b59db77eab74184669b2e130 to your computer and use it in GitHub Desktop.
Cats Effect Concurrent examples
package cats_effect_typeclass_exericise.concurrent
import java.time.format.DateTimeFormatter
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture}
import cats.effect._
import scala.concurrent.duration._
import scala.language.higherKinds
object Util {
val formatter = DateTimeFormatter.ofPattern("mm:ss")
def putIn(milli: Int, s: Any): String = {
Thread.sleep(milli)
val threadId = Thread.currentThread.getId
val currentMinSec = java.time.LocalTime.now.format(formatter)
println(s"in thread($threadId) at $currentMinSec): $s")
s.toString
}
def putLine(s: Any): String = putIn(1000, s)
def putIn1Sec(s: Any): String = putIn(1000, s)
def putNow(s: Any): String = putIn(0, s)
def putRnd(s: Any): String = putIn((math.random * 1000 + 500).toInt, s)
}
import Util._
object ConcurrentUncancelableDemoApp extends IOApp {
val F = Concurrent[IO]
def run(args: List[String]): IO[ExitCode] = {
val tick: IO[Unit] = F.uncancelable {
putLine("evaluating")
timer.sleep(1.second).map { _ => putLine("after sleep")}
}
putLine("before start")
for {
fiber <- F.start(tick)
_ <- fiber.cancel
_ <- fiber.join
_ <- F.delay { putLine("tick!") }
} yield ExitCode.Success
}
// in thread(1) at 13:13): evaluating
// in thread(1) at 13:15): before start
// in thread(11) at 13:17): after sleep
// in thread(11) at 13:18): tick!
//
// Process finished with exit code 0
}
object ConcurrentRacePairDemoApp extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
putLine("start")
val millis = (math.random() * 100).toInt
val ion = timer.sleep( millis .millis).map(_ => 12345)
val ios = timer.sleep((100 - millis).millis).map(_ => "hello")
putLine("before racePair")
Concurrent[IO].racePair(ion, ios) flatMap {
// the winner may vary at each execution
case Left( (n, fiberS)) => fiberS.join.map(s => s"winner: $n, loser: $s")
case Right((fiberN, s )) => fiberN.join.map(n => s"winner: $s, loser: $n")
} map { s => putLine(s); ExitCode.Success }
}
// in thread(1) at 17:11): start
// in thread(1) at 17:12): before racePair
// in thread(11) at 17:13): winner: 12345, loser: hello
//
// Process finished with exit code 0
}
object ConcurrentCancelableDemoApp extends IOApp {
implicit val F = Concurrent[IO]
implicit val s: ScheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(2)
val d = 1.second
def run(args: List[String]): IO[ExitCode] = {
val io: IO[Unit] = IO.cancelable[Unit] { cb =>
putLine("function k called")
val r = new Runnable {
def run() = { putLine("in runnable "); cb(Right(())) }
}
val f: ScheduledFuture[_] = s.schedule(r, d.length, d.unit)
IO { // CancelToken[IO] = IO[Unit]
val b = f.cancel(false)
putLine(s"cancel result = $b")
} // this is the cancel token
}
putLine("before start")
val iou = for {
fiber <- F.start(io)
_ <- timer.sleep(1.second)
_ <- fiber.cancel
_ <- F.delay { putLine("tick!") } // this line never be displayed
} yield ()
putLine("iou initialized")
iou.map (_ => ExitCode.Success)
}
/* if fiber is canceled, then following lines are printed*/
// in thread(1) at 54:21): before start
// in thread(1) at 54:22): iou initialized
// in thread(12) at 54:24): function k called
// in thread(12) at 54:25): cancel result = true
// in thread(12) at 54:26): tick!
/* if `_ <- fiber.cancel` is commented out, then following lines are printed*/
// in thread(1) at 49:15): before start
// in thread(1) at 49:16): iou initialized
// in thread(12) at 49:17): function k called
// in thread(12) at 49:19): tick!
// in thread(15) at 49:19): in runnable
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment