Last active
December 13, 2015 19:48
-
-
Save danielhopkins/4964725 to your computer and use it in GitHub Desktop.
Timing out futures using Scala promises
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// We're trying to express a future that can timeout but I don't | |
// want to use Await and block the current thread. I need to return | |
// a future that is either timed out or the value of the completed future | |
import concurrent.ExecutionContext.Implicits._ | |
import concurrent.Future | |
import concurrent._ | |
import concurrent.duration._ | |
import akka.pattern.after | |
import scala.util.{Try, Success, Failure} | |
def t = Thread.currentThread.getName | |
val sys = akka.actor.ActorSystem() | |
// This after function, from akka seems to be the only way to do this | |
def timeout(d: FiniteDuration) = after(d, using = sys.scheduler) { | |
println(s"Timeout in $t") | |
Future.failed(new TimeoutException) | |
} | |
lazy val f = future { | |
println(s"Sleeping future running in ${t}") | |
Thread.sleep(1000) | |
println(s"Done sleeping future in ${t}") | |
1 | |
} | |
// In the real function I will be returning f2 but I wanted to test the response | |
// and blocking-ness | |
val f2 = Future.firstCompletedOf(Seq(timeout(200.millis), f)) | |
// The onComplete handler doesn't block, so Thread.sleep won't | |
// prevent this thread from moving forward | |
f2.onComplete{ | |
case Success(r) => { | |
Thread.sleep(1000) | |
println(s"Success! $r in $t") | |
} | |
case Failure(f) => { | |
Thread.sleep(1000) | |
println(s"Time out in $t") | |
} | |
} | |
// We should exit from the current context before | |
// the future finishes, because run-main isn't blocked | |
println(s"Done with calls in ${t}") | |
// Tiny sugar to help this: | |
implicit class FutureSugar[T](f: Future[T]) { | |
def orTimeout(timeout: FiniteDuration)(defaultValue: => T): Future[T] = { | |
lazy val afterFuture = after(timeout, using = sys.scheduler)(Future.failed(new TimeoutException)) | |
Future.firstCompletedOf(Seq(afterFuture, f)).recoverWith { | |
case _:TimeoutException => Future.successful[T](defaultValue) | |
} | |
} | |
} | |
// Silly me, this is a little easier: | |
implicit class FutureSugar[T](f: Future[T]) { | |
def orTimeout(timeout: FiniteDuration)(defaultValue: => T): Future[T] = { | |
lazy val afterFuture = after(timeout, using = sys.scheduler)(Future.successful(defaultValue)) | |
Future.firstCompletedOf(Seq(afterFuture, f)) | |
} | |
} | |
// Now you can call that timeout like this: | |
val f2 = f.orTimeout(200.millis)(-1) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
I'm used to doing this with by using the ActorSystem.scheduler to schedule a competing task to write to a Promise - if the future completes first, Promise.trySuccess() gets that value, or if the scheduled task fires first, it puts a new TimoutException into the Promise. https://gist.github.com/jamie-allen/4982333
Note that it's non-deterministic either way - no guarantees that the scheduler or after will fire at exactly the time you specify.