Forked from lancegatlin/gist:ffb64667cfb286d98686
Created
September 12, 2014 02:22
-
-
Save carter437/ea2aa3a1939f74fcb8a7 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object FutureOps extends FutureOps | |
trait FutureOps { | |
def takeFirstMatch[A](futures: Traversable[Future[A]], predicate: A => Boolean)(implicit ec:ExecutionContext): Future[Option[A]] = { | |
if(futures.nonEmpty) { | |
val promise = Promise[Option[A]]() | |
val completedCount = new java.util.concurrent.atomic.AtomicInteger(0) | |
val allDoneLatch = Promise[Unit] | |
def maybeAllDone() { | |
// Note: check and execute normally creates a race but only one of the futures can cause the final count to be reached | |
if(completedCount.incrementAndGet() == futures.size) { | |
allDoneLatch.success(()) | |
} | |
} | |
Future.sequence(futures) onComplete { case _ => | |
allDoneLatch.future onSuccess { case _ => | |
val exceptions = futures.map(_.value).flatten.filter(_.isFailure).map(_.failed.get) | |
if(!promise.isCompleted) { | |
// If there was at least one success that didn't pass the predicate | |
if(exceptions.size < futures.size) { | |
exceptions.foreach(ex => ec.reportFailure(ex)) | |
promise.success(None) | |
} else { | |
promise.failure(new RuntimeException(s"All Futures Failed - StackTraces: ${exceptions.toList.zipWithIndex.map { case (e, i) => s"Exception ${i} ${ExceptionUtils.getStackTrace(e)}"}}")) | |
} | |
} else { | |
exceptions.foreach(ex => ec.reportFailure(ex)) | |
} | |
} | |
} | |
futures foreach { x => x.onComplete { | |
case Success(a) if predicate(a) => | |
promise.trySuccess(Some(a)) | |
maybeAllDone() | |
case _ => | |
maybeAllDone() | |
} | |
} | |
promise.future | |
} | |
else { | |
Future.successful(None) | |
} | |
} | |
def takeFirst[A](futures: Traversable[Future[A]]) = takeFirstMatch(futures,{ _:A => true}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment