Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active April 1, 2018 14:57
Show Gist options
  • Save alexandru/55a6038c2fe61025d555 to your computer and use it in GitHub Desktop.
Save alexandru/55a6038c2fe61025d555 to your computer and use it in GitHub Desktop.
Task: A diverging design from Future and Scalaz Task

Task Proposal (for Monifu)

This proposal is about Task, an alternative for Scala's Future, Scalaz's Task or C#'s Task. Note this is not final. You can track the current progress on: monifu/tree/task*.

We're using the following classes already defined in Monifu:

A Task models a producer/consumer relationship in which the producer pushes a single value to the consumer on success, or an exception on error and then stops. Thus we can have a consumer interface that mirrors the Observer in Rx, like so:

trait Callback[-T] {
  def scheduler: Scheduler
  def onSuccess(value: T): Unit
  def onError(ex: Throwable): Unit
}

The contract is this:

  • on success the producer calls onSuccess exactly once, then stops
  • on error the producer calls onError exactly once, then stops
  • in case producing the value is very cheap (value is already known), then execution of onSuccess is allowed to happen on the current thread (see Task.successfull below)
  • in case producing the value is expensive (if it's possible to block the current thread or to end up in a recursive loop), then execution should always be asynchronous and should preferably happen on the given Scheduler
  • the contract is not meant for users, but for implementers that use Task.unsafeCreate

So we can model our task like this, and note the full interoperability with Scala's Future and compare with the Scalaz Task:

/**
 * For modeling asynchronous computations.
 */
trait Task[+T] { self =>
  /**
   * Characteristic function for our [[Task]].
   *
   * Method is not meant to be used directly.
   * See [[Task.unsafeRun(f* unsafeRun(f)]] as an alternative.
   *
   * NOTE to implementors: `unsafeRun` should always execute asynchronously.
   */
  def unsafeRun(c: TaskCallback[T]): Cancelable

  /**
   * Triggers the asynchronous execution.
   *
   * @param f is a function that will be called with the result on complete
   * @return a [[Cancelable]] that can be used to cancel the in progress async computation
   */
  def unsafeRun(f: Try[T] => Unit)(implicit s: Scheduler): Cancelable =
    unsafeRun(new TaskCallback[T] {
      val scheduler = s
      def onError(ex: Throwable): Unit = f(Failure(ex))
      def onSuccess(value: T): Unit = f(Success(value))
    })

  /**
   * Returns a new Task that applies the mapping function to
   * the element emitted by the source.
   */
  def map[U](f: T => U): Task[U] =
    Task.unsafeCreate[U] { callback =>
      self.unsafeRun(new TaskCallback[T] {
        val scheduler = callback.scheduler

        def onSuccess(value: T): Unit = {
          var streamError = true
          try {
            val u = f(value)
            streamError = false
            callback.onSuccess(u)
          } catch {
            case NonFatal(ex) if streamError =>
              onError(ex)
          }
        }

        def onError(ex: Throwable): Unit =
          callback.onError(ex)
      })
    }

  /**
   * Given a source Task that emits another Task, this function
   * flattens the result, returning a Task equivalent to the
   * emitted Task by the source.
   */
  def flatten[U](implicit ev: T <:< Task[U]): Task[U] =
    Task.unsafeCreate { cb =>
      val cancelable = MultiAssignmentCancelable.collapsible()

      cancelable := self.unsafeRun(new TaskCallback[T] {
        val scheduler = cb.scheduler
        def onSuccess(value: T): Unit = {
          cancelable := value.unsafeRun(cb)
        }

        def onError(ex: Throwable): Unit =
          cb.onError(ex)
      })

      cancelable
    }

  /**
   * Creates a new Task by applying a function to the successful
   * result of the source Task, and returns a task equivalent to
   * the result of the function.
   *
   * Calling `flatMap` is literally the equivalent of:
   * {{{
   *   task.map(f).flatten
   * }}}
   */
  def flatMap[U](f: T => Task[U]): Task[U] =
    map(f).flatten

  /**
   * Returns a task that on execution returns the result of the source
   * task but delayed in time by the given `timestamp`.
   */
  def delay(timespan: FiniteDuration): Task[T] =
    Task.unsafeCreate[T] { callback =>
      val cancelable = MultiAssignmentCancelable()
      cancelable := callback.scheduler.scheduleOnce(timespan,
        new Runnable {
          override def run(): Unit = {
            cancelable := self.unsafeRun(callback)
          }
        })

      cancelable
    }

  /**
   * Returns a failed projection of this task.
   *
   * The failed projection is a future holding a value of type `Throwable`,
   * emitting a value which is the throwable of the original task in
   * case the original task fails, otherwise if the source succeeds, then
   * it fails with a `NoSuchElementException`.
   */
  def failed: Task[Throwable] =
    Task.unsafeCreate { callback =>
      self.unsafeRun(new TaskCallback[T] {
        val scheduler = callback.scheduler

        def onError(ex: Throwable): Unit =
          callback.onSuccess(ex)

        def onSuccess(value: T): Unit =
          callback.onError(new NoSuchElementException("Task.failed"))
      })
    }

  /**
   * Creates a new task that will handle any matching throwable
   * that this task might emit.
   */
  def onErrorRecover[U >: T](pf: PartialFunction[Throwable, U]): Task[U] =
    Task.unsafeCreate { callbackU =>
      self.unsafeRun(new TaskCallback[T] {
        val scheduler = callbackU.scheduler

        def onError(ex: Throwable): Unit = {
          var streamError = false
          try {
            if (pf.isDefinedAt(ex)) {
              val u = pf(ex)
              streamError = true
              callbackU.onSuccess(u)
            } else {
              callbackU.onError(ex)
            }
          } catch {
            case NonFatal(err) if streamError =>
              callbackU.scheduler.reportFailure(ex)
              callbackU.onError(err)
          }
        }

        def onSuccess(value: T): Unit =
          callbackU.onSuccess(value)
      })
    }

  /**
   * Creates a new task that will handle any matching throwable that this
   * task might emit by executing another task.
   */
  def onErrorRecoverWith[U >: T](pf: PartialFunction[Throwable, Task[U]]): Task[U] = {
    Task.unsafeCreate { callbackU =>
      val cancelable = MultiAssignmentCancelable()

      cancelable := self.unsafeRun(new TaskCallback[T] {
        val scheduler = callbackU.scheduler

        def onError(ex: Throwable): Unit = {
          var streamError = true
          try {
            if (pf.isDefinedAt(ex)) {
              val newTask = pf(ex)
              streamError = false
              cancelable := newTask.unsafeRun(callbackU)
            } else {
              callbackU.onError(ex)
            }
          } catch {
            case NonFatal(err) if streamError =>
              callbackU.scheduler.reportFailure(ex)
              callbackU.onError(err)
          }
        }

        def onSuccess(value: T): Unit =
          callbackU.onSuccess(value)
      })

      cancelable
    }
  }

  /**
   * Returns a Task that mirrors the source Task but that triggers a
   * `TimeoutException` in case the given duration passes without the
   * task emitting any item.
   */
  def timeout(after: FiniteDuration): Task[T] =
    Task.unsafeCreate { callback =>
      val c = CompositeCancelable()

      c += callback.scheduler.scheduleOnce(after,
        new Runnable {
          def run(): Unit = {
            if (c.cancel())
              callback.onError(new TimeoutException(
                s"Task timed-out after $after of inactivity"))
          }
        })

      c += self.unsafeRun(new TaskCallback[T] {
        val scheduler = callback.scheduler
        def onError(ex: Throwable): Unit =
          if (c.cancel()) callback.onError(ex)
        def onSuccess(value: T): Unit =
          if (c.cancel()) callback.onSuccess(value)
      })

      Cancelable(c.cancel())
    }

  /**
   * Returns a Task that mirrors the source Task but switches to
   * the given backup Task in case the given duration passes without the
   * source emitting any item.
   */
  def timeout[U >: T](after: FiniteDuration, backup: Task[U]): Task[U] =
    Task.unsafeCreate { callback =>
      val isActive = CompositeCancelable()
      val cancelable = MultiAssignmentCancelable(isActive)

      isActive += callback.scheduler.scheduleOnce(after,
        new Runnable {
          def run(): Unit = {
            if (isActive.cancel())
              cancelable := backup.unsafeRun(callback)
          }
        })

      isActive += self.unsafeRun(new TaskCallback[T] {
        val scheduler = callback.scheduler
        def onError(ex: Throwable): Unit =
          if (isActive.cancel()) callback.onError(ex)
        def onSuccess(value: T): Unit =
          if (isActive.cancel()) callback.onSuccess(value)
      })

      cancelable
    }

  /**
   * Zips the values of `this` and `that` task, and creates a new task that
   * will emit the tuple of their results.
   */
  def zip[U](that: Task[U]): Task[(T, U)] =
    Task.unsafeCreate { callbackTU =>
      val c = CompositeCancelable()
      val state = Atomic(null : Either[T, U])

      c += self.unsafeRun(
        new TaskCallback[T] {
          val scheduler = callbackTU.scheduler
          def onError(ex: Throwable): Unit = {
            if (c.cancel())
              callbackTU.onError(ex)
          }

          @tailrec
          def onSuccess(t: T): Unit =
            state.get match {
              case null =>
                if (!state.compareAndSet(null, Left(t)))
                  onSuccess(t)
              case Right(u) =>
                callbackTU.onSuccess((t, u))
              case Left(_) =>
                ()
            }
        })


      c += that.unsafeRun(
        new TaskCallback[U] {
          val scheduler = callbackTU.scheduler
          def onError(ex: Throwable): Unit = {
            if (c.cancel())
              callbackTU.onError(ex)
          }

          @tailrec
          def onSuccess(u: U): Unit =
            state.get match {
              case null =>
                if (!state.compareAndSet(null, Right(u)))
                  onSuccess(u)
              case Left(t) =>
                callbackTU.onSuccess((t, u))
              case Right(_) =>
                ()
            }
        })

      Cancelable(c.cancel())
    }
}

object Task {
  /**
   * Returns a new task that, when executed, will emit the
   * result of the given function executed asynchronously.
   */
  def apply[T](f: => T): Task[T] =
    Task.unsafeCreate { callback =>
      val cancelable = BooleanCancelable()
      callback.scheduler.execute(
        new Runnable {
          override def run(): Unit =
            if (!cancelable.isCanceled) {
              try callback.onSuccess(f) catch {
                case NonFatal(ex) =>
                  callback.onError(ex)
              }
            }
        })

      cancelable
    }

  /**
   * Builder for [[Task]] instances. Only use if you know what
   * you're doing.
   */
  def unsafeCreate[T](f: TaskCallback[T] => Cancelable): Task[T] =
    new Task[T] {
      def unsafeRun(c: TaskCallback[T]): Cancelable = f(c)
    }

  /**
   * Returns a task that on execution is always successful,
   * emitting the given element.
   */
  def successful[T](elem: T): Task[T] =
    Task.unsafeCreate { callback =>
      val cancelable = BooleanCancelable()
      callback.scheduler.execute(
        new Runnable {
          override def run(): Unit =
            if (!cancelable.isCanceled) {
              try callback.onSuccess(elem) catch {
                case NonFatal(ex) =>
                  callback.onError(ex)
              }
            }
        })

      cancelable
    }

  /**
   * Returns a task that on execution is always finishing
   * in error emitting the specified exception.
   */
  def error(ex: Throwable): Task[Nothing] =
    Task.unsafeCreate { callback =>
      val cancelable = BooleanCancelable()
      callback.scheduler.execute(
        new Runnable {
          override def run(): Unit =
            if (!cancelable.isCanceled) {
              callback.onError(ex)
            }
        })

      cancelable
    }
}

Note that this triggers the actual execution using:

def unsafeRun(f: Try[T] => Unit)(implicit s: Scheduler): Cancelable

Until now our API is very similar to more pure Task implementations. But this doesn't cover all usecases that Scala's Future have and one problem is one of memoization. We may want to memoize the result of the last execution such that it can be shared by anybody who executes unsafeRun. So what if we tried building a version of our Task that memoizes the value?

Would it look like this?

trait MemoizableTask[+T] extends Task[T] {
  def isCompleted: Boolean
  def value: Option[Try[T]]
  
  // inherited from Task
  def unsafeRun(c: Callback[T]): Cancelable
}

object MemoizableTask {
  /** Transforms any Task into a memoizable Task */
  def apply(underlying: Task[T]): MemoizableTask[T] = ???
}

Oh, but then we've got a problems. The first unsafeRun execution will pay the full cost of the underlying Task execution, whereas our unsafeRun executions subsequent to the completion will come for free.

And here we've got a design issue: if we are to memoize the result for subsequent unsafeRun executions, then it makes sense to start executing this task as soon as possible, as we want to memoize it as soon as possible. This is common sense, because we are doing this to share the result with multiple listeners. But if we are going to compute the result as fast as possible, that means when our task instance is created. And doing that means we need an execution context when our task is created, with the first unsafeRun being too late.

Then again, we've arrived at Scala's Future and so it makes no sense to duplicate the effort. We might as well convert to Scala's Future should we need it and be done with it.

But I still like the ability to .cancel() for as long as the value hasn't been computed. This gives us the ability to say, cancel delayed tasks, like timeouts. But then we can just extend from Scala's Future, because why not?

final class CancelableFuture[+T](underlying: Future[T], cancelable: Cancelable)
  extends Future[T] with Cancelable {
  
  // ...
}

object CancelableFuture {
  def apply[T](task: Task[T])(implicit s: Scheduler): CancelableFuture[T] = {
    val p = Promise[T]()
    val c = SingleAssignmentCancelable()
    val cancelable = Cancelable {
      if (c.cancel())
        p.tryFailure(new CancellationException)
    }

    c := task.unsafeRun(new TaskCallback[T] {
      val scheduler = s
      def onError(ex: Throwable): Unit =
        if (c.cancel()) p.tryFailure(ex)
      def onSuccess(value: T): Unit =
        if (c.cancel()) p.trySuccess(value)
    })

    new CancelableFuture(p.future, cancelable)
  }
}

And then on our Task, we can have a brand new unsafeRun:

trait Task[+T] { self =>

  def unsafeRun(implicit s: Scheduler): CancelableFuture[T] =
    CancelableFuture(this)
    
  // ...
}

And now we've got interoperability with the standard Future, which surely beats callbacks.

@milessabin
Copy link

Could you put together a side-by-side comparison between the design you're proposing here and Scalaz's Task? Something short, just bullet points with a few code snippets to illustrate usage.

@alexandru
Copy link
Author

Yes, I will. I'm currently fleshing out the implementation, since this didn't exist yesterday morning and after that I'll provide a comparison :-)

@alexandru
Copy link
Author

Updated :-)

@tpolecat
Copy link

Thanks for this. Very interesting.

Can you clarify whether these programs the same or not?

val x =
  for {
    _ <- Task { println("hi") }
    _ <- Task { println("hi") }
  } yield ()

and

val hi = Task { println("hi") }
val x =
  for {
  _ <- hi
  _ <- hi
} yield ()

@adelbertc
Copy link

Depending on how the final implementation looks, perhaps you may also want to see how suitable it may be to be in Cats itself? :-)

That being said, we do have a WIP branch here - I believe the progress stopped at making it stack safe, with tests showing it StackOverflowError-ing.

@alexandru
Copy link
Author

@tpolecat yes, it behaves like you'd expect, the two programs are the same.

@adelbertc sure; btw, this implementation is stack safe

@ngbinh
Copy link

ngbinh commented Dec 23, 2015

looks great! 👍

Copy link

ghost commented Dec 23, 2015

@tpolecat re question from cats gitter: "is there a better place to discuss this?"

@adelbertc re WIP branch

Both points were brought up by @non in https://github.com/non/cats/issues/32#issuecomment-141079117, but that was a while ago. Specifically:

I think using this issue to brainstorm, ask questions, and help try to flesh out the requirements of a task in Cats would definitely be helpful.

and

Thus, feature/task (in its current state) is not eligible to be merged.

Just as a suggestion, perhaps the old issue has got stale and rather than try and reboot it, it might be worth launching a V2 issue?

@alexandru
Copy link
Author

👍 @inthenow.

I have updated the document. It now contains a list of what this implementation does better than the Task in Scalaz, as I think @milessabin wanted. See the "What's better than Scalaz?" section.

Copy link

ghost commented Dec 23, 2015

Cool. Before I/we do this, there are two issues regarding moving the current Future instances from Cats to Alleycats - non/alleycats#26 and https://github.com/non/cats/issues/589.

Given this - would Alleycats be the better place to open the new issue and possible first implementation?

@alexandru
Copy link
Author

Changed again (Dec 30).

@puffnfresh
Copy link

scalaz.concurrent.Task is not very good. Using just a reasonable IO type, it should be possible to subsume Task's use cases with some uses of MVar-like atomic-references. For example:

scalaz/scalaz@series/7.3.x...puffnfresh:feature/concurrent-io

I highly recommend focusing on doing a good job at I/O and then using that to talk about concurrency, rather than trying to do the opposite.

@gpampara
Copy link

@puffnfresh: 👍

@aloiscochard
Copy link

Hi all,

You might be interested to look at my matterhorn experiment: https://github.com/aloiscochard/matterhorn

It is basically a specialized interpreter for IO actions: https://github.com/aloiscochard/matterhorn/blob/master/core/src/main/scala/rts/Interpreter.scala

There is some tests and benchmarks showing how it work:
https://github.com/aloiscochard/matterhorn/blob/master/core/src/test/scala/CoreSpec.scala
https://github.com/aloiscochard/matterhorn/blob/master/bench/src/main/scala/pure.scala

The approach to concurrency is to use a STM, I just did plug a library for having rapid prototyping.

Good luck

@alexandru
Copy link
Author

@aloiscochard, @puffnfresh thanks, will take a look.

@pchiusano
Copy link

scalaz.concurrent.Task is not very good. Using just a reasonable IO type, it should be possible to subsume Task's use cases with some uses of MVar-like atomic-references. For example:

I agree, and that is the route we are going with FS2's Task type (which I suppose could be renamed to 'IO'), which implements MVar-like references and then uses that to implement async (and lots of other operations, like parallelTraverse, as well as all the concurrent stream operations of FS2 itself): https://github.com/functional-streams-for-scala/fs2/blob/topic/redesign/core/src/main/scala/fs2/Async.scala#L75

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment