Last active
January 8, 2020 04:18
-
-
Save karlroberts/19f53eed78e0ad19f206912b474a6b1f to your computer and use it in GitHub Desktop.
Show how to get some tasks in a cats.Effect IO to run concurrently with the other IO's in a for comprehension
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
It is confusiong to many (inc me) so here is a demo. | |
I think the main confusion comes from understanding the differences between Blocking, NonBlocking Concurrency, Asynchronicity, runUnsafe and runUnsafeAsync. | |
Concurrent means tasks run or appear to run at the same time either by running on different processors or machines or by timeslicing threads | |
Asynchronous means we don't wait for the result of running a call. This is easy if a function is a proceedure that is a pure side effect ie | |
returns Unit () but if it does then we need to create a callback handler to handle it when it does return. That callback may interupt th running thread or run on another one. | |
All synchronous calls running locally block. You have to wait for the result of one call before the next one proceeds. | |
If that local function takes a long time you will notice it! | |
But people usually refer to Blocking calls when calling remote services or any other kind of IO eg writting to a disk. | |
This is because these calls are orders of magnitude slower than running code on the same CPU or thread, and so | |
appear to block the calling process, but this is not conceptually diffent from calling a slow CPU intensive local | |
function. | |
It is noted that if you can dispatch a callback handler to handle the response eventually, then you can jump past | |
that Blocking call and carry on doing useful work (for other contexts or client calls, while waiting for the slow call | |
to complete. | |
This is what peopl mean by Non-Blocking, you dont waste a thread that just sits there doing noting for seconds at | |
a time. | |
cats Effect IO is a mechanism for running anything that needs IO, ie a side-effect, such as writing to a disk or | |
calling a service or printing to the console in a pure manner. | |
Side effects (functions that have no result,ie return Unit) are simple CPU spinning electricity wasters as far a | |
functional program is concerned. Functional programs are effectivly calculations that return a result value. | |
Of course side effects are how real work in the outside world out side the calculation are performed, eg writing | |
the result of a pure calculation to the console. | |
Side effects are not referentially transparent and so are hard to reason about in a pure function programming style. | |
cats.effect.IO is a Monad that wraps SideEffects in pure calculations. It does this by suspending the side-effect in an IO. | |
The IO is a result (and so pure, not SideEffect is executed, it is really a description of how to run the side effect when you | |
get around to running it. | |
In this way many side-effects can be composed together in a functional style and then run when convienient. | |
this is done with the IO's runUnsafe and runUnsafeAsync methods and their cousins. | |
Confusion can arrise here wrt Concurrent and async programming. | |
see https://typelevel.org/cats-effect/datatypes/io.html#unsafe-operations | |
runUnsafe :- will "read" the IO "program" description and run the side effect synchronously on the current thread... you will | |
have to wait for it, ie the thread is blocked, and you will have to handle the result success or failure. | |
runUnsafeAsync :- will "read" the IO "program" description and run the side effect asynchronously, the calling thread is not blocked | |
at this time as the "result of runUnsafeAsync" is Unit, you code will carry on, the result of the IO is handled by a callback | |
function thatyou provide a callback to handle the result success or failure. | |
The callback executes on the current Thread when the IO returns a value. | |
you provide a callback to handle the result success or failure. | |
Of course you may want the callback to run on another thread concurrently to the main program when it is called, eg to update a | |
log or database record. cats.effetc.IO gives you the ability to Thread shift execution using ContextShift. | |
ContextShift lets you specify an executionContext to run the IO on, usually backed by a thread pool. | |
Now the point that mny people including myself want to do is to program with IO but control how and where code runs | |
so we can dispatch an async call whose callback is handled by a thread pool other than the main (global) | |
fork-join work-stealing pool. | |
You may have a chain of IO's in for comprehention doing busy work on the efficient global fork-join pool but one of the IO's may be | |
a blocking call to a remote system... rather than block the fork-join pool that will stop other busy work from happening | |
you can contextShift to evalOn a blocking thread pool. The for comprension of IO monads will still enforce the sequencing of events | |
(sequencing is the main thing a Monad does) but some tasks will happen on another pool. | |
However, you may have an IO in the sequence that you don't want to wait for, it can happen concurrently as you progress to the next IO, | |
eg logging statements or fire and forget proceedures such as closing down and tidying up resources, that don't realy contibute to the | |
"calculation" you are doing but need to be scheduled after certain tasks are done. | |
IO gives you access to a "green thread" mechanism called Fibers that are very lightweight and allow co-operative concurrency. | |
IO.start returns a IO[Fiber[IO, A]] (where A is the return type of the IO). | |
Of course this is still an IO so nothing happens yet but the "description of the side-effect program" can be interpreted as | |
"when you see IO.start kick off a running Fibre to run this IO side-effect" | |
Now this will generate work but to see the result (remember it happend concurrently) you need to run Fibre.join to pass the result back to the thread of execution | |
Fibers operate concurrently because you have to implicitly provide a ContextShift for them to run on. Of course this means that you can explicitly | |
provide one too, to specify a diffent thread pool to run on, usefull if the work will be Blocking. | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
The code below has io1 io2 io3 that sleep for 2 seconds, 3 seconds and 2 seconds respectivly. | |
I sequence them in order in the for comprension but show how they can be context shifted onto other pools or | |
run concurrently in Fibers. the results are attched to this gist. | |
Note I use IOApp rather than App which automatically runs the run function to get the IO then calls runUnsafe on it | |
*/ | |
package kfoo | |
import java.util.concurrent.Executors | |
import cats.effect._ | |
import cats.syntax.all._ | |
import org.apache.logging.log4j.scala.Logging | |
import scala.concurrent.ExecutionContext | |
object ThreadPools { | |
import java.util.concurrent.ThreadFactory | |
import java.util.concurrent.atomic.AtomicLong | |
// so I can name threads | |
def threadFactory(prefix: String, initialVal: Long = 0): ThreadFactory = | |
new ThreadFactory() { | |
final private val threadIndex = new AtomicLong(initialVal) | |
override def newThread(runnable: Runnable): Thread = { | |
val thread = new Thread(runnable) | |
thread.setName(prefix + "-" + threadIndex.getAndIncrement) | |
thread | |
} | |
} | |
// a pool with a single thread in it. for when we know ther is just on thing to do | |
val singleAsyncExecutor = | |
Executors.newSingleThreadExecutor(threadFactory("single-async-1")) | |
val async1 = ExecutionContext.fromExecutor(singleAsyncExecutor) | |
// cached pool will add more threads if non are available and shrink back to minimum when done | |
// usefull for remote blocking operations, its no problem to have thousands of threads (other then mem overhead) | |
// if they are blocked and so in waiting state and not bothering the CPU | |
val blocker1Executor = | |
Executors.newCachedThreadPool(threadFactory("blocker-1")) | |
val blocker1 = ExecutionContext.fromExecutor(blocker1Executor) | |
} | |
object KarlDemoConcurrent extends IOApp with Logging { | |
import ThreadPools._ | |
def run(args: List[String]): IO[ExitCode] = | |
( | |
IO { logger.info("starting doIOS_AllSynchronousSamePool") } *> doIOS_AllSynchronousSamePool() *> | |
IO { logger.info("\n\nstarting doIOS_AllSynchronousDiffPool") } *> doIOS_AllSynchronousDiffPool() *> | |
IO { logger.info("\n\nstarting doIOS_io2IsConcurrentSamePool") } *> doIOS_io2IsConcurrentSamePool() *> | |
IO { logger.info("\n\nstarting doIOS_io2IsConcurrentDiffPool") } *> doIOS_io2IsConcurrentDiffPool() *> | |
IO { logger.info("\n\nstarting doIOS_AllConcurrent") } *> doIOS_AllConcurrent() | |
).as(ExitCode.Success) | |
def io1 = IO { Thread.sleep((2000)) } *> IO { logger.info("This is io1") } | |
def io2 = IO { Thread.sleep((3000)) } *> IO { logger.info("This is io2") }.handleErrorWith(t => IO { logger.error("WTF", t) }) | |
def io3 = IO { Thread.sleep((2000)) } *> IO { logger.info("This is io3") } | |
def doIOS_AllSynchronousSamePool() = | |
for { | |
io1 <- io1 | |
io2 <- io2 | |
io3 <- io3 | |
} yield () | |
def doIOS_AllSynchronousDiffPool() = | |
for { | |
io1 <- io1 | |
io2 <- contextShift.evalOn(async1)(io2) | |
io3 <- io3 | |
} yield () | |
def doIOS_io2IsConcurrentSamePool() = | |
for { | |
io1 <- io1 | |
io2 <- io2.start | |
io3 <- io3 | |
_ <- io2.join | |
} yield () | |
def doIOS_io2IsConcurrentDiffPool() = | |
for { | |
io1 <- io1 | |
io2 <- io2.start(IO.contextShift(async1)) | |
io3 <- io3 | |
_ <- io2.join | |
} yield () | |
def doIOS_AllConcurrent() = | |
for { | |
io1 <- io1.start | |
io2 <- io2.start(IO.contextShift(async1)) | |
io3 <- io3.start | |
_ <- io2.join | |
_ <- io3.join | |
_ <- io1.join | |
} yield () | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
15:13:05.901 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - starting doIOS_AllSynchronousSamePool | |
15:13:07.904 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - This is io1 | |
15:13:10.909 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - This is io2 | |
15:13:12.914 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - This is io3 | |
15:13:12.915 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - | |
starting doIOS_AllSynchronousDiffPool | |
15:13:14.915 [ioapp-compute-0] INFO kfoo.KarlDemoAsync$ - This is io1 | |
15:13:17.952 [single-async-1-0] INFO kfoo.KarlDemoAsync$ - This is io2 | |
15:13:19.963 [ioapp-compute-1] INFO kfoo.KarlDemoAsync$ - This is io3 | |
15:13:19.964 [ioapp-compute-1] INFO kfoo.KarlDemoAsync$ - | |
starting doIOS_io2IsConcurrentSamePool | |
15:13:21.964 [ioapp-compute-1] INFO kfoo.KarlDemoAsync$ - This is io1 | |
15:13:23.968 [ioapp-compute-1] INFO kfoo.KarlDemoAsync$ - This is io3 | |
15:13:24.967 [ioapp-compute-2] INFO kfoo.KarlDemoAsync$ - This is io2 | |
15:13:24.973 [ioapp-compute-2] INFO kfoo.KarlDemoAsync$ - | |
starting doIOS_io2IsConcurrentDiffPool | |
15:13:26.973 [ioapp-compute-2] INFO kfoo.KarlDemoAsync$ - This is io1 | |
15:13:28.977 [ioapp-compute-2] INFO kfoo.KarlDemoAsync$ - This is io3 | |
15:13:29.976 [single-async-1-0] INFO kfoo.KarlDemoAsync$ - This is io2 | |
15:13:29.977 [single-async-1-0] INFO kfoo.KarlDemoAsync$ - | |
starting doIOS_AllConcurrent | |
15:13:31.978 [ioapp-compute-3] INFO kfoo.KarlDemoAsync$ - This is io1 | |
15:13:31.983 [ioapp-compute-4] INFO kfoo.KarlDemoAsync$ - This is io3 | |
15:13:32.984 [single-async-1-0] INFO kfoo.KarlDemoAsync$ - This is io2 | |
Process finished with exit code 0 |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment