Skip to content

Instantly share code, notes, and snippets.

@pchiusano
Created July 16, 2013 03:57
Show Gist options
  • Save pchiusano/6005653 to your computer and use it in GitHub Desktop.
Save pchiusano/6005653 to your computer and use it in GitHub Desktop.
Experimental, nonblocking implementation of Future.chooseAny. First thread to finish notifies listener.
def chooseAny[A](h: Future[A], t: Seq[Future[A]]): Future[(A, Seq[Future[A]])] = {
Async { cb =>
// The details of this implementation are a bit tricky, but the general
// idea is to run all `fs` in parallel, returning whichever result
// becomes available first.
// To account for the fact that the losing computations are still
// running, we construct special 'residual' Futures for the losers
// that will first return from the already running computation,
// then revert back to running the original Future.
val won = new AtomicBoolean(false) // threads race to set this
val fs = (h +: t).view.zipWithIndex.map { case (f, ind) =>
val used = new AtomicBoolean(false)
val latch = new CountDownLatch(1)
val ref = new AtomicReference[A]
val residual = Async { (cb: A => Trampoline[Unit]) =>
// A bit of trickiness here, since two threads may listen to this
// Async simultaneously, and we only want one to receive the value
// inside `ref`. To ensure this, we race to set the `used` flag.
// Whichever one wins gets the value inside `ref`, the other just
// delegates to `f`.
if (used.get) f.listen(cb)
else {
latch.await
if (used.compareAndSet(false, true))
cb(ref.get).run
else
f.listen(cb)
}
}
(f, residual, ind, latch, ref)
}.toIndexedSeq
fs.foreach { case (forig, fresidual, ind, latch, ref) =>
forig.runAsync { a =>
ref.set(a)
latch.countDown // unblocks any listeners on fresidual
// If we're the first to finish, invoke `cb`, passing residuals
if (won.compareAndSet(false, true))
cb((a, fs.collect { case (_,f,i,_,_) if i != ind => f })).run
else {
// noop; another thread will have already invoked `cb` w/ our residual
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment