Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active April 1, 2018 14:57
Show Gist options
  • Save alexandru/55a6038c2fe61025d555 to your computer and use it in GitHub Desktop.
Save alexandru/55a6038c2fe61025d555 to your computer and use it in GitHub Desktop.
Task: A diverging design from Future and Scalaz Task

Task Proposal, for Monifu (or Cats? would be cool)

This proposal is about Task, an alternative for Scala's Future, Scalaz's Task or C#'s Task. Note this is work in progress. You can track the current progress on: monifu/tree/task. The implementation currently consists of these classes:

  • Task (new)
  • Scheduler (an enhanced ExecutionContext, capable of scheduleOnce(delay), already in Monifu)
  • Trampoline (new, internal)
  • EvictingQueue (internal, an array-backed circular FIFO queue, used by our trampoline, already in Monifu)
  • atomic references (already in Monifu)

Design, Comparisson

Versus Scala's Future

I view Task as a complementary to Scala's Future, not as a replacement. See below for details. The problem we are trying to solve is that Future represents a running computation that memoizes its result. It's perfect for its primary use-cases, but is sometimes missused and missunderstood.

  • every one of Future's operators takes an implicit ExecutionContext (making it slightly incompatible with various type-class interfaces)
  • the reason is that every one of Future's operators is side-effecting, immediately sending a task in a thread-pool for execution
  • it goes without saying that Future does not have referential transparency

Monifu's Task fixes these problems. Well, for the FP purists, do note that Monifu's Task still breaks the left identity monad law. This means that this law is not true for functions f that throw exceptions (just like Try and Future before it):

type f = T => Try[U]
Task(t).flatMap(f) === f(t) // not true

But for me that's unavoidable and I don't even want to attempt fixing that just to tick a checkbox, as Task is about asynchronous computations and that means dealing with non-determinism (on the underlying Java or Javascript platforms).

Versus Scalaz's Task

I must say that I was impressed with the design of the Scalaz Task. The good parts of the Scalaz Task that my Task also does:

  • I originally ignored stack overflows, but the authors of Scalaz Task did not; fret not, this Task implementation also protects against stack overflows
  • in comparisson with Scala's standard Future, a Task only describes an asynchronous computation, however nothing is executed and no side-effects triggered until Task.runAsync
  • the Scalaz Task executes its operators on a trampoline by default, making it very efficient and in case we want to paralelize, that choice can be explicit; this is in stark contrast with Scala's Future and actually in line with Monifu's Observable, making me very happy :-) ... I copied the idea, though not the implementation

What's better than Scalaz?

  • clean API, dirty and rather elegant and well encapsulated implementation: the internal design of Task follows that of Monifu's Observable and I must emphasise on better encapsulation - personally I feel there's no reason to expose the underlying Trampoline mechanics, because who knows, in the future I might not want to use a Trampoline
    • the Future in Scalaz is described as a "trampolined computation"; my Task by comparisson is just an "asynchronous computation"
    • you don't get a guarantee that a Task will be executed on the current thread, or on a trampoline, like when using Task.delay; you only get a promise that the implementation will try its best (and in fact my Trampoline has a limited buffer and can start rejecting on execute, the fallback being on the thread-pool)
    • the other guarantee you get when executing my Task is that the stack won't blow up
    • the single-threaded performance of Monifu matches that of Scalaz, but when threads are forked, for some reason my implementation has more than twice the throughput, matching and even exceeding that of Scala's standard Future
    • in the short and flawed benchmarks I've made, my implementation has better RAM usage characteristics, which makes sense because of the dirty implementation
  • Monifu's Task always requires a Scheduler on execution (because it's not the user who gets to decide whether a scheduler or a thread-pool is needed)
    • as a consequence, Monifu's Task does not take an implicit thread-pool on builders such as Task.apply, or a scheduler on utilities such as Task#delay: Monifu's Task has clean builders that do not leak implementation details ;-)
  • Monifu has its own Scheduler (inherits from Scala's ExecutionContext, equivalent for a ScheduledExecutorService), so we can work on Javascript and are not piggy-backing on Java's standard library
    • this implementation already runs on Scala.js and runs well
  • Monifu's Task has integration with Scala's Future, having a runAsync that returns a Future, as for what it does (e.g. memoization, callbacks management) is the best choice, plus it's standard
  • Monifu's Task has no def run: A or any other synchronous equivalent; a Task is considered to be asynchronous and hence it has no facilities for blocking the current thread, but we can rely on Scala's own facilities and we can do Await.result(task.runAsync, 10.seconds) (yes, that's the Await utility from the standard library) and that's way cooler because we don't want to handle blocking ourselves (because it's a platform dependent thing) and because Await.result is integrated with Scala's BlockContext, which has special benefits for our ForkJoin pool (blocking like the pros)

Usage

To answer a question, x1 and x2 will produce the same result and the same side-effects in both Monifu and Scalaz:

val x1 =
  for {
    _ <- Task { println("hi") }
    _ <- Task { println("hi") }
  } yield ()
  
val hi = Task { println("hi") }
val x2 =
  for {
   _ <- hi
   _ <- hi
  } yield ()  

Creating a task (that is supposed to be executed on another logical thread) is similar:

// creating task for execution on a separate logical thread
val task = Task {
  println("Executing ...")
  "Hello, world!"
}

But there is a difference here, in signature:

// Scalaz
def apply[A](a: => A)(implicit pool: ExecutorService): Task[A]

// Monifu
def apply[T](f: => T): Task[T]

As a matter of phylosophy, the difference is that in Monifu the execution of a Task is ALWAYS asynchronous, always, meaning that we expect tasks to fork at any moment (plus the trampoline is not unbounded and once we hit a limit, we fork) and so on runAsync that's when we take our Scheduler. Also Monifu is using Scheduler, an enhanced ExecutionContextthat's also supported on Scala.js, whereas Scalaz relies on Java's ExecutorService.

We can apply various operators on it. This code is valid with both:

// Yes, this uses a lot of memory (because of that map),
// but should not trigger stack overflows.
// Also, remove the map, and you've got a tail recursion.

def sum(n: Int, acc: Long = 0): Task[Long] = {
  if (n == 0) Task(acc) else
    Task(n).flatMap(x => sum(x-1, acc + x).map(_ + 1))
}

We can recover from an error (similar to Future.recover, Future.recoverWith and same thing is possible with the Scalaz Task):

// Monifu
monifuTask.onErrorRecover {
  case _: SomeException => 
    "fallback value"
}

monifuTask.onErrorRecoverWith {
  case _: SomeException => 
    Task("fallback value")
}

We can delay execution:

// Monifu
task.delay(10.minutes)

// Scalaz
delay.delay(10.minutes)

We can interrupt a task if it takes too long to execute:

// will fail with a `TimeoutException` after 10 seconds
monifuTask.timeout(10.seconds)

// will fallback to a backup after 10 seconds of inactivity
monifuTask.timeout(10.seconds, Task("another value"))

We can zip 2 tasks:

val a = Task(1)
val b = Task(2)

val c: Task[(Int, Int)] = a.zip(b)

We've got some utilities inspired by Scala's Future:

val taskOfList: Task[Seq[T]] = Task.sequence(listOfTasks)

val task: Task[T] = Task.firstCompletedOf(listOfTasks)

Contrary to Future it doesn't execute immediately, but just like its counterpart in Scalaz, it wants to be executed before producing its result and its side-effects:

// Execution with Monifu(nothing happened until now)
// Monifu's Task always requires an execution context on run
import monifu.concurrent.Implicits.globalScheduler
monifuTask.runAsync {
  case Success(value) => println(value)
  case Failure(ex) => ex.printStackTrace
}

// Execution with Scalaz
scalazTask.unsafePerformAsync {
  case -\/(ex) => ex.printStackTrace
  case \/-(value) => println(value)
}

Blocking the current thread for the result:

// in Monifu
Await.result(monifuTask.runAsync, 10.seconds)

// in Scalaz
task.unsafePerformSyncFor(10.minutes)

Monifu's method is actually better because we're piggybacking on Await.result from the standard library. This has the benefit that it communicates with Scala's (or soon to be Java's) ForkJoin pool, exposed by the default ExecutionContext, to maybe add more threads in case a blocking operation occurs (God forbid). It does so by means of the blocking block, in combination with Scala's BlockContext. And that's pretty cool.

I'm also quite fond of my integration with Scala's Future. If you have to memoize the result in order to share it, then Future is there and does what it's supposed to. Plus it surely beats callbacks. So Monifu's Task has the following runAsync signature:

import monifu.concurrent.Implicits.globalScheduler

val task: Task[T] = ???
val future: Future[T] = task.runAsync

// conversion works both ways
val anotherTask = Task.fromFuture(Future("hello"))

Implementation details

A Task models a producer/consumer relationship in which the producer pushes a single value to the consumer on success, or an exception on error and then stops. Thus we can have a consumer interface that mirrors the Observer in Rx, like so:

abstract class Callback[-T] { self =>
  def onSuccess(value: T): Unit
  def onError(ex: Throwable): Unit
  
  // might execute onSuccess on a trampoline, or not :-)
  def asyncOnSuccess(s: Scheduler, value: T, fork: Boolean = false): Unit = ???
  // might execute onError on a trampoline, or not :-)
  def asyncOnError(s: Scheduler, ex: Throwable, fork: Boolean = false): Unit = ???
}

This type is not meant for users, but for people implementing operators or other utilities. The contract is this:

  • on success the producer calls onSuccess exactly once, then stops
  • on error the producer calls onError exactly once, then stops
  • in operators, we should never call onSuccess or onError directly, as that can trigger stack overflows, but rather we should use the Trampoline or fork a new thread
  • the contract is not meant for users, but for implementers of operators, or people using Task.unsafeCreate

So we can model our task:

/**
 * For modeling asynchronous computations.
 */
trait Task[+T] { self =>
  /**
   * Characteristic function for our [[Task]].
   * Everything gets implemented based on this function.
   */
  protected def unsafeRunFn(scheduler: Scheduler, callback: Callback[T]): Unit

  // SAMPLE OPERATOR
  def map[U](f: T => U): Task[U] =
    Task.unsafeCreate[U] { (s,cb) =>
      self.unsafeRunFn(s, new Callback[T] {
        def onError(ex: Throwable): Unit =
          cb.asyncOnError(s, ex)

        def onSuccess(value: T): Unit =
          try {
            val u = f(value)
            cb.asyncOnSuccess(s, u)
          } catch {
            case NonFatal(ex) =>
              cb.asyncOnError(s, ex)
          }
      })
    }
    
  // ...
}

object Task {
  /** Returns a new task that, when executed, will emit the
    * result of the given function executed asynchronously.
    */
  def apply[T](f: => T): Task[T] =
    Task.unsafeCreate { (scheduler, callback) =>
      scheduler.execute(
        new Runnable {
          def run(): Unit = {
            // protecting against user code
            try callback.onSuccess(f) catch {
              case NonFatal(ex) =>
                callback.onError(ex)
            }
          }
        })
    }

  /** Builder for [[Task]] instances. For usage on implementing
    * operators or builders. Only use if you know what you're doing.
    */
  def unsafeCreate[T](f: (Scheduler, Callback[T]) => Unit): Task[T] =
    new Task[T] {
      override def unsafeRunFn(s: Scheduler, cb: Callback[T]): Unit =
        f(s,cb)
    }
}

I'm actually hiding the implementation details of the trampoline (under the method calls asyncOnSuccess and asyncOnError). Because they don't matter. That's my OOP-ridden childhood speaking :-)

@milessabin
Copy link

Could you put together a side-by-side comparison between the design you're proposing here and Scalaz's Task? Something short, just bullet points with a few code snippets to illustrate usage.

@alexandru
Copy link
Author

Yes, I will. I'm currently fleshing out the implementation, since this didn't exist yesterday morning and after that I'll provide a comparison :-)

@alexandru
Copy link
Author

Updated :-)

@tpolecat
Copy link

Thanks for this. Very interesting.

Can you clarify whether these programs the same or not?

val x =
  for {
    _ <- Task { println("hi") }
    _ <- Task { println("hi") }
  } yield ()

and

val hi = Task { println("hi") }
val x =
  for {
  _ <- hi
  _ <- hi
} yield ()

@adelbertc
Copy link

Depending on how the final implementation looks, perhaps you may also want to see how suitable it may be to be in Cats itself? :-)

That being said, we do have a WIP branch here - I believe the progress stopped at making it stack safe, with tests showing it StackOverflowError-ing.

@alexandru
Copy link
Author

@tpolecat yes, it behaves like you'd expect, the two programs are the same.

@adelbertc sure; btw, this implementation is stack safe

@ngbinh
Copy link

ngbinh commented Dec 23, 2015

looks great! 👍

Copy link

ghost commented Dec 23, 2015

@tpolecat re question from cats gitter: "is there a better place to discuss this?"

@adelbertc re WIP branch

Both points were brought up by @non in https://github.com/non/cats/issues/32#issuecomment-141079117, but that was a while ago. Specifically:

I think using this issue to brainstorm, ask questions, and help try to flesh out the requirements of a task in Cats would definitely be helpful.

and

Thus, feature/task (in its current state) is not eligible to be merged.

Just as a suggestion, perhaps the old issue has got stale and rather than try and reboot it, it might be worth launching a V2 issue?

@alexandru
Copy link
Author

👍 @inthenow.

I have updated the document. It now contains a list of what this implementation does better than the Task in Scalaz, as I think @milessabin wanted. See the "What's better than Scalaz?" section.

Copy link

ghost commented Dec 23, 2015

Cool. Before I/we do this, there are two issues regarding moving the current Future instances from Cats to Alleycats - non/alleycats#26 and https://github.com/non/cats/issues/589.

Given this - would Alleycats be the better place to open the new issue and possible first implementation?

@alexandru
Copy link
Author

Changed again (Dec 30).

@puffnfresh
Copy link

scalaz.concurrent.Task is not very good. Using just a reasonable IO type, it should be possible to subsume Task's use cases with some uses of MVar-like atomic-references. For example:

scalaz/scalaz@series/7.3.x...puffnfresh:feature/concurrent-io

I highly recommend focusing on doing a good job at I/O and then using that to talk about concurrency, rather than trying to do the opposite.

@gpampara
Copy link

@puffnfresh: 👍

@aloiscochard
Copy link

Hi all,

You might be interested to look at my matterhorn experiment: https://github.com/aloiscochard/matterhorn

It is basically a specialized interpreter for IO actions: https://github.com/aloiscochard/matterhorn/blob/master/core/src/main/scala/rts/Interpreter.scala

There is some tests and benchmarks showing how it work:
https://github.com/aloiscochard/matterhorn/blob/master/core/src/test/scala/CoreSpec.scala
https://github.com/aloiscochard/matterhorn/blob/master/bench/src/main/scala/pure.scala

The approach to concurrency is to use a STM, I just did plug a library for having rapid prototyping.

Good luck

@alexandru
Copy link
Author

@aloiscochard, @puffnfresh thanks, will take a look.

@pchiusano
Copy link

scalaz.concurrent.Task is not very good. Using just a reasonable IO type, it should be possible to subsume Task's use cases with some uses of MVar-like atomic-references. For example:

I agree, and that is the route we are going with FS2's Task type (which I suppose could be renamed to 'IO'), which implements MVar-like references and then uses that to implement async (and lots of other operations, like parallelTraverse, as well as all the concurrent stream operations of FS2 itself): https://github.com/functional-streams-for-scala/fs2/blob/topic/redesign/core/src/main/scala/fs2/Async.scala#L75

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment