Created
February 1, 2019 12:49
-
-
Save yasuabe/8fe34189b59db77eab74184669b2e130 to your computer and use it in GitHub Desktop.
Cats Effect Concurrent examples
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package cats_effect_typeclass_exericise.concurrent | |
import java.time.format.DateTimeFormatter | |
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture} | |
import cats.effect._ | |
import scala.concurrent.duration._ | |
import scala.language.higherKinds | |
object Util { | |
val formatter = DateTimeFormatter.ofPattern("mm:ss") | |
def putIn(milli: Int, s: Any): String = { | |
Thread.sleep(milli) | |
val threadId = Thread.currentThread.getId | |
val currentMinSec = java.time.LocalTime.now.format(formatter) | |
println(s"in thread($threadId) at $currentMinSec): $s") | |
s.toString | |
} | |
def putLine(s: Any): String = putIn(1000, s) | |
def putIn1Sec(s: Any): String = putIn(1000, s) | |
def putNow(s: Any): String = putIn(0, s) | |
def putRnd(s: Any): String = putIn((math.random * 1000 + 500).toInt, s) | |
} | |
import Util._ | |
object ConcurrentUncancelableDemoApp extends IOApp { | |
val F = Concurrent[IO] | |
def run(args: List[String]): IO[ExitCode] = { | |
val tick: IO[Unit] = F.uncancelable { | |
putLine("evaluating") | |
timer.sleep(1.second).map { _ => putLine("after sleep")} | |
} | |
putLine("before start") | |
for { | |
fiber <- F.start(tick) | |
_ <- fiber.cancel | |
_ <- fiber.join | |
_ <- F.delay { putLine("tick!") } | |
} yield ExitCode.Success | |
} | |
// in thread(1) at 13:13): evaluating | |
// in thread(1) at 13:15): before start | |
// in thread(11) at 13:17): after sleep | |
// in thread(11) at 13:18): tick! | |
// | |
// Process finished with exit code 0 | |
} | |
object ConcurrentRacePairDemoApp extends IOApp { | |
def run(args: List[String]): IO[ExitCode] = { | |
putLine("start") | |
val millis = (math.random() * 100).toInt | |
val ion = timer.sleep( millis .millis).map(_ => 12345) | |
val ios = timer.sleep((100 - millis).millis).map(_ => "hello") | |
putLine("before racePair") | |
Concurrent[IO].racePair(ion, ios) flatMap { | |
// the winner may vary at each execution | |
case Left( (n, fiberS)) => fiberS.join.map(s => s"winner: $n, loser: $s") | |
case Right((fiberN, s )) => fiberN.join.map(n => s"winner: $s, loser: $n") | |
} map { s => putLine(s); ExitCode.Success } | |
} | |
// in thread(1) at 17:11): start | |
// in thread(1) at 17:12): before racePair | |
// in thread(11) at 17:13): winner: 12345, loser: hello | |
// | |
// Process finished with exit code 0 | |
} | |
object ConcurrentCancelableDemoApp extends IOApp { | |
implicit val F = Concurrent[IO] | |
implicit val s: ScheduledExecutorService = java.util.concurrent.Executors.newScheduledThreadPool(2) | |
val d = 1.second | |
def run(args: List[String]): IO[ExitCode] = { | |
val io: IO[Unit] = IO.cancelable[Unit] { cb => | |
putLine("function k called") | |
val r = new Runnable { | |
def run() = { putLine("in runnable "); cb(Right(())) } | |
} | |
val f: ScheduledFuture[_] = s.schedule(r, d.length, d.unit) | |
IO { // CancelToken[IO] = IO[Unit] | |
val b = f.cancel(false) | |
putLine(s"cancel result = $b") | |
} // this is the cancel token | |
} | |
putLine("before start") | |
val iou = for { | |
fiber <- F.start(io) | |
_ <- timer.sleep(1.second) | |
_ <- fiber.cancel | |
_ <- F.delay { putLine("tick!") } // this line never be displayed | |
} yield () | |
putLine("iou initialized") | |
iou.map (_ => ExitCode.Success) | |
} | |
/* if fiber is canceled, then following lines are printed*/ | |
// in thread(1) at 54:21): before start | |
// in thread(1) at 54:22): iou initialized | |
// in thread(12) at 54:24): function k called | |
// in thread(12) at 54:25): cancel result = true | |
// in thread(12) at 54:26): tick! | |
/* if `_ <- fiber.cancel` is commented out, then following lines are printed*/ | |
// in thread(1) at 49:15): before start | |
// in thread(1) at 49:16): iou initialized | |
// in thread(12) at 49:17): function k called | |
// in thread(12) at 49:19): tick! | |
// in thread(15) at 49:19): in runnable | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment