Skip to content

Instantly share code, notes, and snippets.

@ericacm
Last active December 16, 2015 07:48
Show Gist options
  • Save ericacm/5401303 to your computer and use it in GitHub Desktop.
Save ericacm/5401303 to your computer and use it in GitHub Desktop.
trait FutureCancelSupport {
def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
val p = Promise[T]()
val f = p.future
val funFuture = Future(fun(f))
funFuture.onComplete(p tryComplete(_)) // Akka 2.0
// p tryCompleteWith funFuture // Scala 2.10
(f, () => p.tryComplete(Left(new CancellationException))) // Akka 2.0
// (f, () => p.tryFailure(new CancellationException)) // Scala 2.10
}
def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = {
val p = Promise[T]()
val f = p.future
val aref = new AtomicReference[Thread](null)
val funFuture = Future {
val thread = Thread.currentThread
aref.synchronized { aref.set(thread) }
try fun() finally {
val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread
// Deal with interrupted flag of this thread in desired
}
}
funFuture.onComplete(p tryComplete(_)) // Akka 2.0
// p tryCompleteWith funFuture // Scala 2.10
(f, () => {
aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } }
p.tryComplete(Left(new CancellationException)) // Akka 2.0
// p.tryFailure(new CancellationException) // Scala 2.10
})
}
}
class TestFutureCancelSupport extends FunSuite with FutureCancelSupport with BeforeAndAfterAll {
// import ExecutionContext.Implicits.global // Scala 2.10
val exec = Executors.newSingleThreadExecutor() // Akka 2.0
implicit val ec = ExecutionContext.fromExecutor(exec) // Akka 2.0
override def afterAll() {
exec.shutdownNow() // Akka 2.0
}
def createCancellableFuture(loopSec: Int) = cancellableFuture[Int](future => {
var i = 0
// isCompleted acts as our interrupted-flag. Will loop for runSecs unless cancelled.
while (!future.isCompleted && i < loopSec) {
println(i)
Thread.sleep(1000)
i += 1
}
i // Completed
})
test("cancellableFuture was cancelled") {
val (f, cancel) = createCancellableFuture(loopSec = 2)
Thread.sleep(1000)
val wasCancelled = cancel()
assert(wasCancelled, "future should be cancelled")
intercept[CancellationException] {
Await.result(f, 1.second)
}
}
test("cancellableFuture was not cancelled") {
val loopSec = 1
val (f, cancel) = createCancellableFuture(loopSec)
Thread.sleep(2000)
val wasCancelled = cancel()
assert(!wasCancelled, "future should not be cancelled")
assert(Await.result(f, 1.second) === loopSec)
}
def createInterruptableFuture(blockSec: Int, result: Int) = interruptableFuture[Int] { () =>
val latch = new CountDownLatch(1)
// Latch will time out after blockSec unless interrupted
latch.await(blockSec, TimeUnit.SECONDS)
println("latch timed out")
result // Completed
}
test("interruptableFuture was interrupted") {
val result = 42
val (f, cancel) = createInterruptableFuture(blockSec = 2, result)
Thread.sleep(1000)
val wasInterrupted = cancel()
assert(wasInterrupted, "future should be interrupted")
intercept[CancellationException] {
Await.result(f, 1.second)
}
}
test("interruptableFuture was not interrupted") {
val result = 42
val (f, cancel) = createInterruptableFuture(blockSec = 1, result)
Thread.sleep(2000)
val wasInterrupted = cancel()
assert(!wasInterrupted, "future should not be interrupted")
val res = Await.result(f, 1.second)
assert(res === result)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment