-
-
Save lancegatlin/ffb64667cfb286d98686 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object FutureOps extends FutureOps | |
trait FutureOps { | |
def toTry[A](self: Future[A])(implicit ec: ExecutionContext): Future[Try[A]] = { | |
val p = Promise[Try[A]]() | |
self onComplete { case result => p.success(result) } | |
p.future | |
} | |
def takeFirstMatch[A](futures: Traversable[Future[A]], predicate: A => Boolean)(implicit ec:ExecutionContext): Future[Option[A]] = { | |
if(futures.nonEmpty) { | |
val promise = Promise[Option[A]]() | |
val completedCount = new java.util.concurrent.atomic.AtomicInteger(0) | |
val allDoneLatch = Promise[Unit])() | |
def maybeAllDone() { | |
// Note: check and execute normally creates a race but only one of the futures can cause the final count to be reached | |
if(completedCount.incrementAndGet() == futures.size) { | |
allDoneLatch.success(()) | |
} | |
} | |
// Note: Future.sequence can't fail here since toTry never fails | |
Future.sequence(futures.map(_.toTry)) onSuccess { case completedFutures => | |
// Note: allDoneLatch can only succeed | |
allDoneLatch.future onSuccess { case _ => | |
val exceptions = completedFutures.flatMap(_.failed.toOption) | |
if(promise.isCompleted == false) { | |
// If there was at least one success that didn't pass the predicate | |
if(exceptions.size < futures.size) { | |
exceptions.foreach(ex => ec.reportFailure(ex)) | |
promise.success(None) | |
} else { | |
promise.failed(new RuntimeException(s"All Futures Failed - StackTraces: ${exceptions.zipWithIndex.map { case (e, i) => s"Exception ${i} ${ExceptionUtils.getStackTrace(e)}"}}")) | |
} | |
} else { | |
// Ensure any exceptions that happened aren't blackholed when this method succeeds | |
exceptions.foreach(ex => ec.reportFailure(ex)) | |
} | |
} | |
} | |
futures foreach { _.onComplete { | |
case Success(a) if predicate(a) => | |
promise.trySuccess(Some(a)) | |
maybeAllDone() | |
case _ => | |
maybeAllDone() | |
} | |
promise.future | |
} else { | |
Future.successful(None) | |
} | |
} | |
def takeFirst[A](futures: Traversable[Future[A]]) = takeFirstMatch(futures,{ _:A => true}) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment