Last active
March 5, 2019 15:06
-
-
Save mihaisoloi/04f9ddbb16292fa3b6b3769d2744244e to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import cats.effect.ExitCode | |
import monix.eval.{Task, TaskApp} | |
import monix.execution.Scheduler | |
import monix.execution.misc.Local | |
import cats.implicits._ | |
import monix.execution.schedulers.TracingScheduler | |
import java.util.concurrent.{CompletableFuture, TimeUnit} | |
import scala.compat.java8.FutureConverters.toScala | |
object TaskLocalApp extends TaskApp { | |
override val scheduler: Scheduler = Scheduler.traced | |
val io: Scheduler = TracingScheduler(Scheduler.io("io")) | |
val globalLocal = Local[Int](0) | |
val asyncBoundary = Task.unit.executeAsync | |
val init = Task { | |
println(s"Starts on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}") | |
globalLocal.update(1) | |
}.executeWithOptions(_.enableLocalContextPropagation) | |
val futureLike = CompletableFuture.runAsync(() => { | |
TimeUnit.SECONDS.sleep(1) | |
println(s"Future like on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}") | |
}) | |
val forked = | |
Task { | |
println(s"Running on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}") | |
globalLocal.update(2) | |
}.executeOn(io) | |
val onFinish = | |
Task(println(s"Ends on thread: ${Thread.currentThread.getName} local: ${globalLocal.get}")) | |
def quarantine[A](task: => Task[A]): Task[A] = { | |
val old = globalLocal.get | |
task.asyncBoundary.map { a => | |
globalLocal.update(old) | |
a | |
} | |
} | |
def quarantine[A](task: => Task[A], scheduler: Scheduler): Task[A] = { | |
val old = globalLocal.get | |
task.asyncBoundary(scheduler).map { a => | |
globalLocal.update(old) | |
a | |
} | |
} | |
def run(args: List[String]): Task[ExitCode] = | |
init // executes on the default scheduler | |
// .flatMap(_ => forked) // executed on IO and keeps the Local instance | |
.flatMap(_ => quarantine(Task.fromFuture(toScala(futureLike)))) // executes on ForkJoinPool and looses the Local without the call to quarantine | |
.doOnFinish(_ => onFinish) // executes on the default scheduler | |
.as(ExitCode.Success) | |
} |
A more optimal implementation of quarantine is:
def quarantine[A](task: => Task[A], scheduler: Scheduler): Task[A] =
for {
old <- Task(Local.getContext())
result <- value.asyncBoundary(scheduler)
_ <- Task(Local.setContext(old))
} yield result
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Output: