Created
November 26, 2016 08:55
-
-
Save joost-de-vries/9ec032c9765e658115f6cbce51d4b41d to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import scala.concurrent.{ ExecutionContext, Future } | |
| import scala.util.{ Failure, Success, Try } | |
| import scala.util.control.NonFatal | |
| /** the resulting exception when the until function becomes true */ | |
| class CanceledException extends Exception | |
| /** | |
| * A function that is comparable to Future.sequence. | |
| * Future.sequence processes in parallel and in case of failure it returns only the failure. | |
| * This function processes sequentially (non-blocking) and stops at the first failure and returns that failure and | |
| * all results up to that moment. | |
| */ | |
| object AsyncProcessing { | |
| /** this exception is private to mapSequentially */ | |
| class WithResultsException[A, B](cause: Throwable, val results: Seq[B], val failingItem: A) extends Exception(cause) | |
| implicit class SeqExtension[A](s: Seq[A]) { | |
| /** | |
| * Do a foldLeft asynchronously. The signature is like the standard foldLeft with the difference that function argument | |
| * is asynchronous. | |
| */ | |
| def foldLeftAsync[B](initial: B)(f: (B, A) => Future[B])(implicit ec: ExecutionContext): Future[B] = | |
| s.foldLeft(Future.successful(initial))((future, item) => future.flatMap(f(_, item))) | |
| /** | |
| * | |
| * @param f the function to call for each item | |
| * @param until processing will proceed until this function returns true | |
| * @param progress this function will be called after each item that has been processed | |
| * @param ec executionContext | |
| * @tparam B the type of the result of function f | |
| * @return a tuple of 1. the successful results, 2. the failure, if any, with the argument that resulted in the failure. | |
| */ | |
| def mapSequentially[B](f: A => Future[B], until: () => Boolean = () => false, progress: (Int, Try[B]) => Unit = (seqNr: Int, outcome: Try[B]) => ())(implicit ec: ExecutionContext): Future[(Seq[B], Option[(A, Throwable)])] = { // scalastyle:ignore | |
| val none: Option[(A, Throwable)] = None | |
| s.zipWithIndex.foldLeftAsync(Seq.empty[B]) { | |
| case (seq, (item, i)) => { | |
| val cancel = until() | |
| val result = if (!cancel) f(item) else Future.failed(new CanceledException) | |
| result.onComplete { | |
| case s @ Success(_) => progress(i, s) | |
| case f @ Failure(_) => progress(i, f) | |
| } | |
| result.map(seq :+ _).recoverWith { | |
| case NonFatal(e) => Future.failed(new WithResultsException[A, B](e, seq, failingItem = item)) | |
| } | |
| } | |
| } | |
| .map((_, none)) | |
| .recover { | |
| case e: WithResultsException[A, B @unchecked] => (e.results, Some((e.failingItem, e.getCause))) | |
| } | |
| } | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment