This proposal is about Task
, an alternative for Scala's Future
, Scalaz's Task
or C#'s Task
.
Note this is not final. You can track the current progress on:
monifu/tree/task*.
We're using the following classes already defined in Monifu:
- Cancelable, along with these variants:
- Scheduler (an enhanced
ExecutionContext
) - atomic references
A Task
models a producer/consumer relationship in which the producer pushes
a single value to the consumer on success, or an exception on error and then stops.
Thus we can have a consumer interface that mirrors the Observer
in Rx, like so:
trait Callback[-T] {
def scheduler: Scheduler
def onSuccess(value: T): Unit
def onError(ex: Throwable): Unit
}
The contract is this:
- on success the producer calls
onSuccess
exactly once, then stops - on error the producer calls
onError
exactly once, then stops - in case producing the
value
is very cheap (value is already known), then execution ofonSuccess
is allowed to happen on the current thread (seeTask.successfull
below) - in case producing the
value
is expensive (if it's possible to block the current thread or to end up in a recursive loop), then execution should always be asynchronous and should preferably happen on the given Scheduler - the contract is not meant for users, but for implementers that use
Task.unsafeCreate
So we can model our task like this, and note the full interoperability with Scala's Future and compare with the Scalaz Task:
/**
* For modeling asynchronous computations.
*/
trait Task[+T] { self =>
/**
* Characteristic function for our [[Task]].
*
* Method is not meant to be used directly.
* See [[Task.unsafeRun(f* unsafeRun(f)]] as an alternative.
*
* NOTE to implementors: `unsafeRun` should always execute asynchronously.
*/
def unsafeRun(c: TaskCallback[T]): Cancelable
/**
* Triggers the asynchronous execution.
*
* @param f is a function that will be called with the result on complete
* @return a [[Cancelable]] that can be used to cancel the in progress async computation
*/
def unsafeRun(f: Try[T] => Unit)(implicit s: Scheduler): Cancelable =
unsafeRun(new TaskCallback[T] {
val scheduler = s
def onError(ex: Throwable): Unit = f(Failure(ex))
def onSuccess(value: T): Unit = f(Success(value))
})
/**
* Returns a new Task that applies the mapping function to
* the element emitted by the source.
*/
def map[U](f: T => U): Task[U] =
Task.unsafeCreate[U] { callback =>
self.unsafeRun(new TaskCallback[T] {
val scheduler = callback.scheduler
def onSuccess(value: T): Unit = {
var streamError = true
try {
val u = f(value)
streamError = false
callback.onSuccess(u)
} catch {
case NonFatal(ex) if streamError =>
onError(ex)
}
}
def onError(ex: Throwable): Unit =
callback.onError(ex)
})
}
/**
* Given a source Task that emits another Task, this function
* flattens the result, returning a Task equivalent to the
* emitted Task by the source.
*/
def flatten[U](implicit ev: T <:< Task[U]): Task[U] =
Task.unsafeCreate { cb =>
val cancelable = MultiAssignmentCancelable.collapsible()
cancelable := self.unsafeRun(new TaskCallback[T] {
val scheduler = cb.scheduler
def onSuccess(value: T): Unit = {
cancelable := value.unsafeRun(cb)
}
def onError(ex: Throwable): Unit =
cb.onError(ex)
})
cancelable
}
/**
* Creates a new Task by applying a function to the successful
* result of the source Task, and returns a task equivalent to
* the result of the function.
*
* Calling `flatMap` is literally the equivalent of:
* {{{
* task.map(f).flatten
* }}}
*/
def flatMap[U](f: T => Task[U]): Task[U] =
map(f).flatten
/**
* Returns a task that on execution returns the result of the source
* task but delayed in time by the given `timestamp`.
*/
def delay(timespan: FiniteDuration): Task[T] =
Task.unsafeCreate[T] { callback =>
val cancelable = MultiAssignmentCancelable()
cancelable := callback.scheduler.scheduleOnce(timespan,
new Runnable {
override def run(): Unit = {
cancelable := self.unsafeRun(callback)
}
})
cancelable
}
/**
* Returns a failed projection of this task.
*
* The failed projection is a future holding a value of type `Throwable`,
* emitting a value which is the throwable of the original task in
* case the original task fails, otherwise if the source succeeds, then
* it fails with a `NoSuchElementException`.
*/
def failed: Task[Throwable] =
Task.unsafeCreate { callback =>
self.unsafeRun(new TaskCallback[T] {
val scheduler = callback.scheduler
def onError(ex: Throwable): Unit =
callback.onSuccess(ex)
def onSuccess(value: T): Unit =
callback.onError(new NoSuchElementException("Task.failed"))
})
}
/**
* Creates a new task that will handle any matching throwable
* that this task might emit.
*/
def onErrorRecover[U >: T](pf: PartialFunction[Throwable, U]): Task[U] =
Task.unsafeCreate { callbackU =>
self.unsafeRun(new TaskCallback[T] {
val scheduler = callbackU.scheduler
def onError(ex: Throwable): Unit = {
var streamError = false
try {
if (pf.isDefinedAt(ex)) {
val u = pf(ex)
streamError = true
callbackU.onSuccess(u)
} else {
callbackU.onError(ex)
}
} catch {
case NonFatal(err) if streamError =>
callbackU.scheduler.reportFailure(ex)
callbackU.onError(err)
}
}
def onSuccess(value: T): Unit =
callbackU.onSuccess(value)
})
}
/**
* Creates a new task that will handle any matching throwable that this
* task might emit by executing another task.
*/
def onErrorRecoverWith[U >: T](pf: PartialFunction[Throwable, Task[U]]): Task[U] = {
Task.unsafeCreate { callbackU =>
val cancelable = MultiAssignmentCancelable()
cancelable := self.unsafeRun(new TaskCallback[T] {
val scheduler = callbackU.scheduler
def onError(ex: Throwable): Unit = {
var streamError = true
try {
if (pf.isDefinedAt(ex)) {
val newTask = pf(ex)
streamError = false
cancelable := newTask.unsafeRun(callbackU)
} else {
callbackU.onError(ex)
}
} catch {
case NonFatal(err) if streamError =>
callbackU.scheduler.reportFailure(ex)
callbackU.onError(err)
}
}
def onSuccess(value: T): Unit =
callbackU.onSuccess(value)
})
cancelable
}
}
/**
* Returns a Task that mirrors the source Task but that triggers a
* `TimeoutException` in case the given duration passes without the
* task emitting any item.
*/
def timeout(after: FiniteDuration): Task[T] =
Task.unsafeCreate { callback =>
val c = CompositeCancelable()
c += callback.scheduler.scheduleOnce(after,
new Runnable {
def run(): Unit = {
if (c.cancel())
callback.onError(new TimeoutException(
s"Task timed-out after $after of inactivity"))
}
})
c += self.unsafeRun(new TaskCallback[T] {
val scheduler = callback.scheduler
def onError(ex: Throwable): Unit =
if (c.cancel()) callback.onError(ex)
def onSuccess(value: T): Unit =
if (c.cancel()) callback.onSuccess(value)
})
Cancelable(c.cancel())
}
/**
* Returns a Task that mirrors the source Task but switches to
* the given backup Task in case the given duration passes without the
* source emitting any item.
*/
def timeout[U >: T](after: FiniteDuration, backup: Task[U]): Task[U] =
Task.unsafeCreate { callback =>
val isActive = CompositeCancelable()
val cancelable = MultiAssignmentCancelable(isActive)
isActive += callback.scheduler.scheduleOnce(after,
new Runnable {
def run(): Unit = {
if (isActive.cancel())
cancelable := backup.unsafeRun(callback)
}
})
isActive += self.unsafeRun(new TaskCallback[T] {
val scheduler = callback.scheduler
def onError(ex: Throwable): Unit =
if (isActive.cancel()) callback.onError(ex)
def onSuccess(value: T): Unit =
if (isActive.cancel()) callback.onSuccess(value)
})
cancelable
}
/**
* Zips the values of `this` and `that` task, and creates a new task that
* will emit the tuple of their results.
*/
def zip[U](that: Task[U]): Task[(T, U)] =
Task.unsafeCreate { callbackTU =>
val c = CompositeCancelable()
val state = Atomic(null : Either[T, U])
c += self.unsafeRun(
new TaskCallback[T] {
val scheduler = callbackTU.scheduler
def onError(ex: Throwable): Unit = {
if (c.cancel())
callbackTU.onError(ex)
}
@tailrec
def onSuccess(t: T): Unit =
state.get match {
case null =>
if (!state.compareAndSet(null, Left(t)))
onSuccess(t)
case Right(u) =>
callbackTU.onSuccess((t, u))
case Left(_) =>
()
}
})
c += that.unsafeRun(
new TaskCallback[U] {
val scheduler = callbackTU.scheduler
def onError(ex: Throwable): Unit = {
if (c.cancel())
callbackTU.onError(ex)
}
@tailrec
def onSuccess(u: U): Unit =
state.get match {
case null =>
if (!state.compareAndSet(null, Right(u)))
onSuccess(u)
case Left(t) =>
callbackTU.onSuccess((t, u))
case Right(_) =>
()
}
})
Cancelable(c.cancel())
}
}
object Task {
/**
* Returns a new task that, when executed, will emit the
* result of the given function executed asynchronously.
*/
def apply[T](f: => T): Task[T] =
Task.unsafeCreate { callback =>
val cancelable = BooleanCancelable()
callback.scheduler.execute(
new Runnable {
override def run(): Unit =
if (!cancelable.isCanceled) {
try callback.onSuccess(f) catch {
case NonFatal(ex) =>
callback.onError(ex)
}
}
})
cancelable
}
/**
* Builder for [[Task]] instances. Only use if you know what
* you're doing.
*/
def unsafeCreate[T](f: TaskCallback[T] => Cancelable): Task[T] =
new Task[T] {
def unsafeRun(c: TaskCallback[T]): Cancelable = f(c)
}
/**
* Returns a task that on execution is always successful,
* emitting the given element.
*/
def successful[T](elem: T): Task[T] =
Task.unsafeCreate { callback =>
val cancelable = BooleanCancelable()
callback.scheduler.execute(
new Runnable {
override def run(): Unit =
if (!cancelable.isCanceled) {
try callback.onSuccess(elem) catch {
case NonFatal(ex) =>
callback.onError(ex)
}
}
})
cancelable
}
/**
* Returns a task that on execution is always finishing
* in error emitting the specified exception.
*/
def error(ex: Throwable): Task[Nothing] =
Task.unsafeCreate { callback =>
val cancelable = BooleanCancelable()
callback.scheduler.execute(
new Runnable {
override def run(): Unit =
if (!cancelable.isCanceled) {
callback.onError(ex)
}
})
cancelable
}
}
Note that this triggers the actual execution using:
def unsafeRun(f: Try[T] => Unit)(implicit s: Scheduler): Cancelable
Until now our API is very similar to more pure Task implementations.
But this doesn't cover all usecases that Scala's Future have and one problem is
one of memoization. We may want to memoize the result of the last execution
such that it can be shared by anybody who executes unsafeRun
. So what if
we tried building a version of our Task
that memoizes the value?
Would it look like this?
trait MemoizableTask[+T] extends Task[T] {
def isCompleted: Boolean
def value: Option[Try[T]]
// inherited from Task
def unsafeRun(c: Callback[T]): Cancelable
}
object MemoizableTask {
/** Transforms any Task into a memoizable Task */
def apply(underlying: Task[T]): MemoizableTask[T] = ???
}
Oh, but then we've got a problems. The first unsafeRun
execution will pay the
full cost of the underlying Task execution, whereas our unsafeRun
executions subsequent to the completion will come for free.
And here we've got a design issue: if we are to memoize the result for subsequent
unsafeRun
executions, then it makes sense to start executing this task as soon
as possible, as we want to memoize it as soon as possible. This is common sense,
because we are doing this to share the result with multiple listeners. But if we
are going to compute the result as fast as possible, that means when our task
instance is created. And doing that means we need an execution context when our
task is created, with the first unsafeRun
being too late.
Then again, we've arrived at Scala's Future
and so it makes no sense to
duplicate the effort. We might as well convert to Scala's Future
should we need it and be done with it.
But I still like the ability to .cancel()
for as long as the value hasn't been
computed. This gives us the ability to say, cancel delayed tasks, like timeouts.
But then we can just extend from Scala's Future
, because why not?
final class CancelableFuture[+T](underlying: Future[T], cancelable: Cancelable)
extends Future[T] with Cancelable {
// ...
}
object CancelableFuture {
def apply[T](task: Task[T])(implicit s: Scheduler): CancelableFuture[T] = {
val p = Promise[T]()
val c = SingleAssignmentCancelable()
val cancelable = Cancelable {
if (c.cancel())
p.tryFailure(new CancellationException)
}
c := task.unsafeRun(new TaskCallback[T] {
val scheduler = s
def onError(ex: Throwable): Unit =
if (c.cancel()) p.tryFailure(ex)
def onSuccess(value: T): Unit =
if (c.cancel()) p.trySuccess(value)
})
new CancelableFuture(p.future, cancelable)
}
}
And then on our Task
, we can have a brand new unsafeRun
:
trait Task[+T] { self =>
def unsafeRun(implicit s: Scheduler): CancelableFuture[T] =
CancelableFuture(this)
// ...
}
And now we've got interoperability with the standard Future
, which surely beats callbacks.
Could you put together a side-by-side comparison between the design you're proposing here and Scalaz's
Task
? Something short, just bullet points with a few code snippets to illustrate usage.