This proposal is about Task
, an alternative for Scala's Future
, Scalaz's Task
or C#'s Task
.
We're using the following classes already defined in Monifu:
- Cancelable, along with these variants:
- Scheduler (an enhanced
ExecutionContext
) - atomic references
A Task
models a producer/consumer relationship in which the producer pushes
a single value to the consumer on success, or an exception on error and then stops.
Thus we can have a consumer interface that mirrors the Observer
in Rx, like so:
trait Callback[-T] {
def scheduler: Scheduler
def onSuccess(value: T): Unit
def onError(ex: Throwable): Unit
}
The contract is this:
- on success the producer calls
onSuccess
exactly once, then stops - on error the producer calls
onError
exactly once, then stops - in case producing the
value
is very cheap (value is already known), then execution ofonSuccess
is allowed to happen on the current thread (seeTask.successfull
below) - in case producing the
value
is expensive (if it's possible to block the current thread or to end up in a recursive loop), then execution should always be asynchronous and should preferably happen on the given Scheduler - the contract is not meant for users, but for implementers that use
Task.unsafeCreate
So we can model our task like this, and note the full interoperability with Scala's Future and compare with the Scalaz Task:
/**
* For modeling asynchronous computations.
*/
trait Task[+T] { self =>
/**
* Characteristic function for our [[Task]].
*/
def unsafeRun(c: Callback[T]): Cancelable
/**
* Triggers asynchronous execution.
*/
def unsafeRun(f: Try[T] => Unit)(implicit s: Scheduler): Cancelable =
unsafeRun(new Callback[T] {
val scheduler = s
def onError(ex: Throwable): Unit = f(Failure(ex))
def onSuccess(value: T): Unit = f(Success(value))
})
/**
* Returns a new Task that applies the mapping function to
* the element emitted by the source.
*/
def map[U](f: T => U): Task[U] =
Task.unsafeCreate[U] { callback =>
self.unsafeRun(new Callback[T] {
val scheduler = callback.scheduler
def onSuccess(value: T): Unit = {
var streamError = true
try {
val u = f(value)
streamError = false
callback.onSuccess(u)
} catch {
case NonFatal(ex) if streamError =>
onError(ex)
}
}
def onError(ex: Throwable): Unit =
callback.onError(ex)
})
}
/**
* Given a source Task that emits another Task, this function
* flattens the result, returning a Task equivalent to the
* emitted Task by the source.
*/
def flatten[U](implicit ev: T <:< Task[U]): Task[U] =
Task.unsafeCreate { callbackU =>
val cancelable = MultiAssignmentCancelable()
cancelable := self.unsafeRun(new Callback[T] {
val scheduler = callbackU.scheduler
def onSuccess(value: T): Unit =
cancelable := value.unsafeRun(callbackU)
def onError(ex: Throwable): Unit =
callbackU.onError(ex)
})
cancelable
}
/**
* Creates a new Task by applying a function to the successful
* result of the source Task, and returns a task equivalent to
* the result of the function.
*
* Calling `flatMap` is literally the equivalent of:
* {{{
* task.map(f).flatten
* }}}
*/
def flatMap[U](f: T => Task[U]): Task[U] =
map(f).flatten
/**
* Returns a task that on execution returns the result of the source
* task but delayed in time by the given `timestamp`.
*/
def delay(timespan: FiniteDuration): Task[T] =
Task.unsafeCreate[T] { callback =>
val cancelable = MultiAssignmentCancelable()
cancelable := callback.scheduler.scheduleOnce(timespan,
new Runnable {
override def run(): Unit = {
cancelable := self.unsafeRun(callback)
}
})
cancelable
}
/**
* Returns a failed projection of this task.
*
* The failed projection is a future holding a value of type `Throwable`,
* emitting a value which is the throwable of the original task in
* case the original task fails, otherwise if the source succeeds, then
* it fails with a `NoSuchElementException`.
*/
def failed: Task[Throwable] =
Task.unsafeCreate { callback =>
self.unsafeRun(new Callback[T] {
val scheduler = callback.scheduler
def onError(ex: Throwable): Unit =
callback.onSuccess(ex)
def onSuccess(value: T): Unit =
callback.onError(new NoSuchElementException("Task.failed"))
})
}
/**
* Creates a new task that will handle any matching throwable
* that this task might emit.
*/
def onErrorRecover[U >: T](pf: PartialFunction[Throwable, U]): Task[U] =
Task.unsafeCreate { callbackU =>
self.unsafeRun(new Callback[T] {
val scheduler = callbackU.scheduler
def onError(ex: Throwable): Unit = {
var streamError = false
try {
if (pf.isDefinedAt(ex)) {
val u = pf(ex)
streamError = true
callbackU.onSuccess(u)
} else {
callbackU.onError(ex)
}
} catch {
case NonFatal(err) if streamError =>
callbackU.scheduler.reportFailure(ex)
callbackU.onError(err)
}
}
def onSuccess(value: T): Unit =
callbackU.onSuccess(value)
})
}
/**
* Creates a new task that will handle any matching throwable that this
* task might emit by executing another task.
*/
def onErrorRecoverWith[U >: T](pf: PartialFunction[Throwable, Task[U]]): Task[U] = {
Task.unsafeCreate { callbackU =>
val cancelable = MultiAssignmentCancelable()
cancelable := self.unsafeRun(new Callback[T] {
val scheduler = callbackU.scheduler
def onError(ex: Throwable): Unit = {
var streamError = true
try {
if (pf.isDefinedAt(ex)) {
val newTask = pf(ex)
streamError = false
cancelable := newTask.unsafeRun(callbackU)
} else {
callbackU.onError(ex)
}
} catch {
case NonFatal(err) if streamError =>
callbackU.scheduler.reportFailure(ex)
callbackU.onError(err)
}
}
def onSuccess(value: T): Unit =
callbackU.onSuccess(value)
})
cancelable
}
}
/**
* Returns a Task that mirrors the source Task but that triggers a
* `TimeoutException` in case the given duration passes without the
* task emitting any item.
*/
def timeout(after: FiniteDuration): Task[T] =
Task.unsafeCreate { callback =>
val c = CompositeCancelable()
c += callback.scheduler.scheduleOnce(after,
new Runnable {
def run(): Unit = {
if (c.cancel())
callback.onError(new TimeoutException(
s"Task timed-out after $after of inactivity"))
}
})
c += self.unsafeRun(new Callback[T] {
val scheduler = callback.scheduler
def onError(ex: Throwable): Unit =
if (c.cancel()) callback.onError(ex)
def onSuccess(value: T): Unit =
if (c.cancel()) callback.onSuccess(value)
})
Cancelable(c.cancel())
}
/**
* Returns a Task that mirrors the source Task but switches to
* the given backup Task in case the given duration passes without the
* source emitting any item.
*/
def timeout[U >: T](after: FiniteDuration, backup: Task[U]): Task[U] =
Task.unsafeCreate { callback =>
val isActive = CompositeCancelable()
val cancelable = MultiAssignmentCancelable(isActive)
isActive += callback.scheduler.scheduleOnce(after,
new Runnable {
def run(): Unit = {
if (isActive.cancel())
cancelable := backup.unsafeRun(callback)
}
})
isActive += self.unsafeRun(new Callback[T] {
val scheduler = callback.scheduler
def onError(ex: Throwable): Unit =
if (isActive.cancel()) callback.onError(ex)
def onSuccess(value: T): Unit =
if (isActive.cancel()) callback.onSuccess(value)
})
cancelable
}
/**
* Zips the values of `this` and `that` task, and creates a new task that
* will emit the tuple of their results.
*/
def zip[U](that: Task[U]): Task[(T, U)] =
Task.unsafeCreate { callbackTU =>
val c = CompositeCancelable()
val state = Atomic(null : Either[T, U])
c += self.unsafeRun(
new Callback[T] {
val scheduler = callbackTU.scheduler
def onError(ex: Throwable): Unit = {
if (c.cancel())
callbackTU.onError(ex)
}
@tailrec
def onSuccess(value: T): Unit =
state.get match {
case null =>
if (!state.compareAndSet(null, Left(value)))
onSuccess(value)
case Right(u) =>
callbackTU.onSuccess((value, u))
case Left(_) =>
()
}
})
c += that.unsafeRun(
new Callback[U] {
val scheduler = callbackTU.scheduler
def onError(ex: Throwable): Unit = {
if (c.cancel())
callbackTU.onError(ex)
}
@tailrec
def onSuccess(value: U): Unit =
state.get match {
case null =>
if (!state.compareAndSet(null, Right(value)))
onSuccess(value)
case Left(l) =>
callbackTU.onSuccess((l, value))
case Right(_) =>
()
}
})
Cancelable(c.cancel())
}
/**
* Converts this task into a Scala `Future`, triggering
* its execution.
*/
def asFuture(implicit s: Scheduler): Future[T] = {
val p = Promise[T]()
self.unsafeRun(new Callback[T] {
val scheduler = s
def onError(ex: Throwable): Unit =
p.tryFailure(ex)
def onSuccess(value: T): Unit =
p.trySuccess(value)
})
p.future
}
}
object Task {
/**
* Returns a new task that, when executed, will emit the
* result of the given function executed asynchronously.
*/
def apply[T](f: => T): Task[T] =
Task.unsafeCreate { callback =>
val cancelable = BooleanCancelable()
callback.scheduler.execute(
new Runnable {
override def run(): Unit =
if (!cancelable.isCanceled) {
try callback.onSuccess(f) catch {
case NonFatal(ex) =>
callback.onError(ex)
}
}
})
cancelable
}
/**
* Builder for [[Task]] instances. Only use if you know what
* you're doing.
*/
def unsafeCreate[T](f: Callback[T] => Cancelable): Task[T] =
new Task[T] {
def unsafeRun(c: Callback[T]): Cancelable = f(c)
}
/**
* Returns a task that on execution is always successful,
* emitting the given element.
*/
def successful[T](elem: T): Task[T] =
Task.unsafeCreate { callback =>
try callback.onSuccess(elem) catch {
case NonFatal(ex) =>
callback.onError(ex)
}
Cancelable.empty
}
/**
* Returns a task that on execution is always finishing
* in error emitting the specified exception.
*/
def error(ex: Throwable): Task[Nothing] =
Task.unsafeCreate { c =>
c.onError(ex)
Cancelable.empty
}
/**
* Converts the given `Future` into a `Task`.
*/
def fromFuture[T](f: => Future[T]): Task[T] =
Task.unsafeCreate { callback =>
implicit val s = callback.scheduler
val cancelable = Cancelable()
f.onComplete {
case Success(value) =>
if (cancelable.cancel())
callback.onSuccess(value)
case Failure(ex) =>
if (cancelable.cancel())
callback.onError(ex)
}
cancelable
}
}
Until now our API is very similar to more pure Task implementations.
But this doesn't cover all usecases that Scala's Future have and one problem is
one of memoization. We may want to memoize the result of the last execution
such that it can be shared by anybody who executes unsafeRun
. So what if
we build a version of our Task that can do just that:
trait MemoizableTask[+T] extends Task[T] {
def isCompleted: Boolean
def value: Option[Try[T]]
// inherited from Task
def unsafeRun(c: Callback[T]): Cancelable
}
object MemoizableTask {
/** Transforms any Task into a memoizable Task */
def apply(underlying: Task[T]): MemoizableTask[T]
}
Oh, but then we've got a problem: the first unsafeRun
execution will pay the
full cost of the underlying Task execution, whereas our unsafeRun
executions subsequent to our task being completed will come for free. And here
we've got these design issues:
- even though returning a
Cancelable
onunsafeRun
is common sense, we can no longer returnCancelable
onunsafeRun
, because we are supposed to memoize the result and reuse it for multiple consumers, so once created, it makes no sense to give the user the ability to cancel - not returning a
Cancelable
onunsafeRun
highlights another fact: if we are to memoize the result for subsequentunsafeRun
executions, then it makes sense to start executing this task as soon as possible, as we want to memoize it as soon as possible
But then again, we've arrived at Scala's Future
and so it makes no sense to
duplicate the effort. We might as well convert to Scala's Future
should we need it and be done with it:
trait Task[+T] {
def unsafeRun(c: Callback[T]): Cancelable
// ...
/**
* Converts this task into a Scala `Future`, triggering
* its execution.
*/
def asFuture(implicit s: Scheduler): Future[T] = {
val p = Promise[T]()
self.unsafeRun(new Callback[T] {
val scheduler = s
def onError(ex: Throwable): Unit =
p.tryFailure(ex)
def onSuccess(value: T): Unit =
p.trySuccess(value)
})
p.future
}
}
Oh wait, what just happened? Do we have a Task
that's complementary to Scala's Future
? Yes we do.
Could you put together a side-by-side comparison between the design you're proposing here and Scalaz's
Task
? Something short, just bullet points with a few code snippets to illustrate usage.