This proposal is about Task
, an alternative for Scala's Future
, Scalaz's Task
or C#'s Task
.
Note this is work in progress. You can track the current progress on:
monifu/tree/task. The implementation currently consists of these classes:
- Task (new)
- Scheduler (an enhanced
ExecutionContext
, capable ofscheduleOnce(delay)
, already in Monifu) - Trampoline (new, internal)
- EvictingQueue (internal, an array-backed circular FIFO queue, used by our trampoline, already in Monifu)
- atomic references (already in Monifu)
I view Task
as a complementary to Scala's Future
, not as a replacement. See below for details. The problem we are trying to solve is that Future
represents a running computation that memoizes its result. It's perfect for its primary use-cases, but is sometimes missused and missunderstood.
- every one of Future's operators takes an implicit
ExecutionContext
(making it slightly incompatible with various type-class interfaces) - the reason is that every one of Future's operators is side-effecting, immediately sending a task in a thread-pool for execution
- it goes without saying that
Future
does not have referential transparency
Monifu's Task
fixes these problems. Well, for the FP purists, do note that Monifu's Task
still breaks the left identity monad law. This means that this law is not true for functions f
that throw exceptions (just like Try
and Future
before it):
type f = T => Try[U]
Task(t).flatMap(f) === f(t) // not true
But for me that's unavoidable and I don't even want to attempt fixing that just to tick a checkbox, as Task
is about asynchronous computations and that means dealing with non-determinism (on the underlying Java or Javascript platforms).
I must say that I was impressed with the design of the Scalaz Task. The good parts of the Scalaz Task
that my Task also does:
- I originally ignored stack overflows, but the authors of Scalaz Task did not; fret not, this Task implementation also protects against stack overflows
- in comparisson with Scala's standard
Future
, aTask
only describes an asynchronous computation, however nothing is executed and no side-effects triggered untilTask.runAsync
- the Scalaz
Task
executes its operators on a trampoline by default, making it very efficient and in case we want to paralelize, that choice can be explicit; this is in stark contrast with Scala'sFuture
and actually in line with Monifu'sObservable
, making me very happy :-) ... I copied the idea, though not the implementation
- clean API, dirty and rather elegant and well encapsulated implementation: the internal design of
Task
follows that of Monifu'sObservable
and I must emphasise on better encapsulation - personally I feel there's no reason to expose the underlyingTrampoline
mechanics, because who knows, in the future I might not want to use aTrampoline
- the
Future
in Scalaz is described as a "trampolined computation"; myTask
by comparisson is just an "asynchronous computation" - you don't get a guarantee that a
Task
will be executed on the current thread, or on a trampoline, like when usingTask.delay
; you only get a promise that the implementation will try its best (and in fact my Trampoline has a limited buffer and can start rejecting on execute, the fallback being on the thread-pool) - the other guarantee you get when executing my
Task
is that the stack won't blow up - the single-threaded performance of Monifu matches that of Scalaz, but when threads are forked, for some reason my implementation has more than twice the throughput, matching and even exceeding that of Scala's standard
Future
- in the short and flawed benchmarks I've made, my implementation has better RAM usage characteristics, which makes sense because of the dirty implementation
- the
- Monifu's
Task
always requires aScheduler
on execution (because it's not the user who gets to decide whether a scheduler or a thread-pool is needed)- as a consequence, Monifu's
Task
does not take an implicit thread-pool on builders such asTask.apply
, or a scheduler on utilities such asTask#delay
: Monifu'sTask
has clean builders that do not leak implementation details ;-)
- as a consequence, Monifu's
- Monifu has its own
Scheduler
(inherits from Scala'sExecutionContext
, equivalent for aScheduledExecutorService
), so we can work on Javascript and are not piggy-backing on Java's standard library- this implementation already runs on Scala.js and runs well
- Monifu's
Task
has integration with Scala'sFuture
, having arunAsync
that returns aFuture
, as for what it does (e.g. memoization, callbacks management) is the best choice, plus it's standard - Monifu's
Task
has nodef run: A
or any other synchronous equivalent; aTask
is considered to be asynchronous and hence it has no facilities for blocking the current thread, but we can rely on Scala's own facilities and we can doAwait.result(task.runAsync, 10.seconds)
(yes, that's theAwait
utility from the standard library) and that's way cooler because we don't want to handle blocking ourselves (because it's a platform dependent thing) and becauseAwait.result
is integrated with Scala's BlockContext, which has special benefits for our ForkJoin pool (blocking like the pros)
To answer a question, x1
and x2
will produce the same result and the same side-effects in both Monifu and Scalaz:
val x1 =
for {
_ <- Task { println("hi") }
_ <- Task { println("hi") }
} yield ()
val hi = Task { println("hi") }
val x2 =
for {
_ <- hi
_ <- hi
} yield ()
Creating a task (that is supposed to be executed on another logical thread) is similar:
// creating task for execution on a separate logical thread
val task = Task {
println("Executing ...")
"Hello, world!"
}
But there is a difference here, in signature:
// Scalaz
def apply[A](a: => A)(implicit pool: ExecutorService): Task[A]
// Monifu
def apply[T](f: => T): Task[T]
As a matter of phylosophy, the difference is that in Monifu the execution of a Task
is ALWAYS asynchronous, always, meaning that we expect tasks to fork at any moment (plus
the trampoline is not unbounded and once we hit a limit, we fork) and so on runAsync
that's when we take our Scheduler
. Also Monifu is using Scheduler
,
an enhanced ExecutionContext
that's also supported on Scala.js, whereas
Scalaz relies on Java's ExecutorService
.
We can apply various operators on it. This code is valid with both:
// Yes, this uses a lot of memory (because of that map),
// but should not trigger stack overflows.
// Also, remove the map, and you've got a tail recursion.
def sum(n: Int, acc: Long = 0): Task[Long] = {
if (n == 0) Task(acc) else
Task(n).flatMap(x => sum(x-1, acc + x).map(_ + 1))
}
We can recover from an error (similar to Future.recover
, Future.recoverWith
and same thing is possible with the Scalaz Task):
// Monifu
monifuTask.onErrorRecover {
case _: SomeException =>
"fallback value"
}
monifuTask.onErrorRecoverWith {
case _: SomeException =>
Task("fallback value")
}
We can delay execution:
// Monifu
task.delay(10.minutes)
// Scalaz
delay.delay(10.minutes)
We can interrupt a task if it takes too long to execute:
// will fail with a `TimeoutException` after 10 seconds
monifuTask.timeout(10.seconds)
// will fallback to a backup after 10 seconds of inactivity
monifuTask.timeout(10.seconds, Task("another value"))
We can zip 2 tasks:
val a = Task(1)
val b = Task(2)
val c: Task[(Int, Int)] = a.zip(b)
We've got some utilities inspired by Scala's Future
:
val taskOfList: Task[Seq[T]] = Task.sequence(listOfTasks)
val task: Task[T] = Task.firstCompletedOf(listOfTasks)
Contrary to Future
it doesn't execute immediately, but just like its counterpart in Scalaz, it wants to be executed before producing its result and its side-effects:
// Execution with Monifu(nothing happened until now)
// Monifu's Task always requires an execution context on run
import monifu.concurrent.Implicits.globalScheduler
monifuTask.runAsync {
case Success(value) => println(value)
case Failure(ex) => ex.printStackTrace
}
// Execution with Scalaz
scalazTask.unsafePerformAsync {
case -\/(ex) => ex.printStackTrace
case \/-(value) => println(value)
}
Blocking the current thread for the result:
// in Monifu
Await.result(monifuTask.runAsync, 10.seconds)
// in Scalaz
task.unsafePerformSyncFor(10.minutes)
Monifu's method is actually better because we're piggybacking on Await.result
from the standard library. This has the benefit that it communicates with Scala's (or soon to be Java's) ForkJoin pool, exposed by the default ExecutionContext
, to maybe add more threads in case a blocking operation occurs (God forbid). It does so by means of the blocking
block, in combination with Scala's BlockContext. And that's pretty cool.
I'm also quite fond of my integration with Scala's Future
. If you have to memoize the result in order to share it, then Future
is there and does what it's supposed to. Plus it surely beats callbacks. So Monifu's Task has the following runAsync
signature:
import monifu.concurrent.Implicits.globalScheduler
val task: Task[T] = ???
val future: Future[T] = task.runAsync
// conversion works both ways
val anotherTask = Task.fromFuture(Future("hello"))
A Task
models a producer/consumer relationship in which the producer pushes
a single value to the consumer on success, or an exception on error and then stops.
Thus we can have a consumer interface that mirrors the Observer
in Rx, like so:
abstract class Callback[-T] { self =>
def onSuccess(value: T): Unit
def onError(ex: Throwable): Unit
// might execute onSuccess on a trampoline, or not :-)
def asyncOnSuccess(s: Scheduler, value: T, fork: Boolean = false): Unit = ???
// might execute onError on a trampoline, or not :-)
def asyncOnError(s: Scheduler, ex: Throwable, fork: Boolean = false): Unit = ???
}
This type is not meant for users, but for people implementing operators or other utilities. The contract is this:
- on success the producer calls
onSuccess
exactly once, then stops - on error the producer calls
onError
exactly once, then stops - in operators, we should never call
onSuccess
oronError
directly, as that can trigger stack overflows, but rather we should use theTrampoline
or fork a new thread - the contract is not meant for users, but for implementers of operators, or people using
Task.unsafeCreate
So we can model our task:
/**
* For modeling asynchronous computations.
*/
trait Task[+T] { self =>
/**
* Characteristic function for our [[Task]].
* Everything gets implemented based on this function.
*/
protected def unsafeRunFn(scheduler: Scheduler, callback: Callback[T]): Unit
// SAMPLE OPERATOR
def map[U](f: T => U): Task[U] =
Task.unsafeCreate[U] { (s,cb) =>
self.unsafeRunFn(s, new Callback[T] {
def onError(ex: Throwable): Unit =
cb.asyncOnError(s, ex)
def onSuccess(value: T): Unit =
try {
val u = f(value)
cb.asyncOnSuccess(s, u)
} catch {
case NonFatal(ex) =>
cb.asyncOnError(s, ex)
}
})
}
// ...
}
object Task {
/** Returns a new task that, when executed, will emit the
* result of the given function executed asynchronously.
*/
def apply[T](f: => T): Task[T] =
Task.unsafeCreate { (scheduler, callback) =>
scheduler.execute(
new Runnable {
def run(): Unit = {
// protecting against user code
try callback.onSuccess(f) catch {
case NonFatal(ex) =>
callback.onError(ex)
}
}
})
}
/** Builder for [[Task]] instances. For usage on implementing
* operators or builders. Only use if you know what you're doing.
*/
def unsafeCreate[T](f: (Scheduler, Callback[T]) => Unit): Task[T] =
new Task[T] {
override def unsafeRunFn(s: Scheduler, cb: Callback[T]): Unit =
f(s,cb)
}
}
I'm actually hiding the implementation details of the trampoline (under the method calls asyncOnSuccess
and asyncOnError
). Because they don't matter. That's my OOP-ridden childhood speaking :-)
Could you put together a side-by-side comparison between the design you're proposing here and Scalaz's
Task
? Something short, just bullet points with a few code snippets to illustrate usage.