Skip to content

Instantly share code, notes, and snippets.

@jehrhardt
Created October 20, 2015 12:45
Show Gist options
  • Save jehrhardt/bfec76d15e7260a95733 to your computer and use it in GitHub Desktop.
Save jehrhardt/bfec76d15e7260a95733 to your computer and use it in GitHub Desktop.

Futures

Dealing with Scala's futures.

Basics

Futures integrate well into the language. We know how to create an Option and Futures should work the same.

import scala.concurrent._

Some(1)
Future(1)

But the execution context is missing!

import ExecutionContext.Implicits.global

Future(1)

The value of the future is call by name!

Future(1 + 1)

Future {
  1 + 1
}

Reading the value of a future

val i = Future(i)
i.value

value is non blocking and thus the current state of the future. When the future is not completed, it returns None. Blocking access requires Await

import scala.concurrent._
import scala.concurrent.duration._

val i = Future(1)

Await.ready(i, 1 second)
i.value
Await.result(i, 1 second)

Futures support the typical methods like map or withFilter

val j = i map {
  case n if n > 0 => n * 2
  case _ => 0
}

Await.result(j, 1 second)

Alternative

val j = i.withFilter(_ > 0).map(_ * 2)
Await.result(j, 1 second)

And even for comprehensions work

val j = for {
  n <- i
  if n > 0
} yield n * 2

The execution context

Typical example calling webservices in parallel. Sequences of Futures should become Future of Sequence

val r = Seq(1, 1, 1, 1, 1, 1, 1).map(s => Future {
  import java.time._

  Thread.sleep(s * 1000)
  LocalTime.now()
})

val t = Future.sequence(r)
Await.result(t, 10 seconds)

Default execution context is based on Fork Join pool and provides one thread per processor

Runtime.getRuntime().availableProcessors()

Blocking calls must be wrapped to spawn additional threads.

val r = Seq(1, 1, 1, 1, 1, 1, 1).map(s => Future {
  import java.time._
  import scala.concurrent.blocking

  blocking {
    Thread.sleep(s * 1000)
    LocalTime.now()
  }
})

val t = Future.sequence(r)
Await.result(t, 10 seconds)

Blocking calls can result many threads, which can be a huge problem. Better use a dedicated execution context.

import java.util.concurrent.Executors

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))

val r = Seq(1, 1, 1, 1, 1, 1, 1).map(s => Future {
  import java.time._

  Thread.sleep(s * 1000)
  LocalTime.now()
})

val t = Future.sequence(r)
Await.result(t, 10 seconds)

The execution context will be set by the compiler. Thus the call Future.sequence() will run on the same execution context. Additional work is need to differentiate the pools.

Additional ways to use futures

Futures provide callbacks

import scala.util.Success

val i = Future(1)
var n = 0

i onComplete {
  case Success(m) => n = m
}

Await.ready(i, 1 second)
n

As with Either a future can be projected to it's failure

val i: Either[String, Int] = Right(1)
for(n <- i.right) yield n

val j = Future(1)
val r = for(e <- j.failed) yield e
Await.result(r, 1 second)

Useful for tests create successful and failed futures.

val i = Future.successful(1)
Await.result(i, 1 second)

val j = Future.failed(new RuntimeException("foo"))
Await.result(j, 1 second)

Promises allow the creation of futures and externally fill there values later.

val p = Promise[Int]()
val i = p.future

p.success(1)

Await.result(i, 1 second)

Callbacks and promises do not require an execution context.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment