Last active
December 16, 2015 07:48
-
-
Save ericacm/5401303 to your computer and use it in GitHub Desktop.
Adapted from http://stackoverflow.com/questions/16020964/cancellation-with-future-with-promise-in-scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
trait FutureCancelSupport { | |
def cancellableFuture[T](fun: Future[T] => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = { | |
val p = Promise[T]() | |
val f = p.future | |
val funFuture = Future(fun(f)) | |
funFuture.onComplete(p tryComplete(_)) // Akka 2.0 | |
// p tryCompleteWith funFuture // Scala 2.10 | |
(f, () => p.tryComplete(Left(new CancellationException))) // Akka 2.0 | |
// (f, () => p.tryFailure(new CancellationException)) // Scala 2.10 | |
} | |
def interruptableFuture[T](fun: () => T)(implicit ex: ExecutionContext): (Future[T], () => Boolean) = { | |
val p = Promise[T]() | |
val f = p.future | |
val aref = new AtomicReference[Thread](null) | |
val funFuture = Future { | |
val thread = Thread.currentThread | |
aref.synchronized { aref.set(thread) } | |
try fun() finally { | |
val wasInterrupted = (aref.synchronized { aref getAndSet null }) ne thread | |
// Deal with interrupted flag of this thread in desired | |
} | |
} | |
funFuture.onComplete(p tryComplete(_)) // Akka 2.0 | |
// p tryCompleteWith funFuture // Scala 2.10 | |
(f, () => { | |
aref.synchronized { Option(aref getAndSet null) foreach { _.interrupt() } } | |
p.tryComplete(Left(new CancellationException)) // Akka 2.0 | |
// p.tryFailure(new CancellationException) // Scala 2.10 | |
}) | |
} | |
} | |
class TestFutureCancelSupport extends FunSuite with FutureCancelSupport with BeforeAndAfterAll { | |
// import ExecutionContext.Implicits.global // Scala 2.10 | |
val exec = Executors.newSingleThreadExecutor() // Akka 2.0 | |
implicit val ec = ExecutionContext.fromExecutor(exec) // Akka 2.0 | |
override def afterAll() { | |
exec.shutdownNow() // Akka 2.0 | |
} | |
def createCancellableFuture(loopSec: Int) = cancellableFuture[Int](future => { | |
var i = 0 | |
// isCompleted acts as our interrupted-flag. Will loop for runSecs unless cancelled. | |
while (!future.isCompleted && i < loopSec) { | |
println(i) | |
Thread.sleep(1000) | |
i += 1 | |
} | |
i // Completed | |
}) | |
test("cancellableFuture was cancelled") { | |
val (f, cancel) = createCancellableFuture(loopSec = 2) | |
Thread.sleep(1000) | |
val wasCancelled = cancel() | |
assert(wasCancelled, "future should be cancelled") | |
intercept[CancellationException] { | |
Await.result(f, 1.second) | |
} | |
} | |
test("cancellableFuture was not cancelled") { | |
val loopSec = 1 | |
val (f, cancel) = createCancellableFuture(loopSec) | |
Thread.sleep(2000) | |
val wasCancelled = cancel() | |
assert(!wasCancelled, "future should not be cancelled") | |
assert(Await.result(f, 1.second) === loopSec) | |
} | |
def createInterruptableFuture(blockSec: Int, result: Int) = interruptableFuture[Int] { () => | |
val latch = new CountDownLatch(1) | |
// Latch will time out after blockSec unless interrupted | |
latch.await(blockSec, TimeUnit.SECONDS) | |
println("latch timed out") | |
result // Completed | |
} | |
test("interruptableFuture was interrupted") { | |
val result = 42 | |
val (f, cancel) = createInterruptableFuture(blockSec = 2, result) | |
Thread.sleep(1000) | |
val wasInterrupted = cancel() | |
assert(wasInterrupted, "future should be interrupted") | |
intercept[CancellationException] { | |
Await.result(f, 1.second) | |
} | |
} | |
test("interruptableFuture was not interrupted") { | |
val result = 42 | |
val (f, cancel) = createInterruptableFuture(blockSec = 1, result) | |
Thread.sleep(2000) | |
val wasInterrupted = cancel() | |
assert(!wasInterrupted, "future should not be interrupted") | |
val res = Await.result(f, 1.second) | |
assert(res === result) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment