Created
July 16, 2013 03:57
-
-
Save pchiusano/6005653 to your computer and use it in GitHub Desktop.
Experimental, nonblocking implementation of Future.chooseAny. First thread to finish notifies listener.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def chooseAny[A](h: Future[A], t: Seq[Future[A]]): Future[(A, Seq[Future[A]])] = { | |
Async { cb => | |
// The details of this implementation are a bit tricky, but the general | |
// idea is to run all `fs` in parallel, returning whichever result | |
// becomes available first. | |
// To account for the fact that the losing computations are still | |
// running, we construct special 'residual' Futures for the losers | |
// that will first return from the already running computation, | |
// then revert back to running the original Future. | |
val won = new AtomicBoolean(false) // threads race to set this | |
val fs = (h +: t).view.zipWithIndex.map { case (f, ind) => | |
val used = new AtomicBoolean(false) | |
val latch = new CountDownLatch(1) | |
val ref = new AtomicReference[A] | |
val residual = Async { (cb: A => Trampoline[Unit]) => | |
// A bit of trickiness here, since two threads may listen to this | |
// Async simultaneously, and we only want one to receive the value | |
// inside `ref`. To ensure this, we race to set the `used` flag. | |
// Whichever one wins gets the value inside `ref`, the other just | |
// delegates to `f`. | |
if (used.get) f.listen(cb) | |
else { | |
latch.await | |
if (used.compareAndSet(false, true)) | |
cb(ref.get).run | |
else | |
f.listen(cb) | |
} | |
} | |
(f, residual, ind, latch, ref) | |
}.toIndexedSeq | |
fs.foreach { case (forig, fresidual, ind, latch, ref) => | |
forig.runAsync { a => | |
ref.set(a) | |
latch.countDown // unblocks any listeners on fresidual | |
// If we're the first to finish, invoke `cb`, passing residuals | |
if (won.compareAndSet(false, true)) | |
cb((a, fs.collect { case (_,f,i,_,_) if i != ind => f })).run | |
else { | |
// noop; another thread will have already invoked `cb` w/ our residual | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment