Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save joost-de-vries/9ec032c9765e658115f6cbce51d4b41d to your computer and use it in GitHub Desktop.

Select an option

Save joost-de-vries/9ec032c9765e658115f6cbce51d4b41d to your computer and use it in GitHub Desktop.
import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
import scala.util.control.NonFatal
/** the resulting exception when the until function becomes true */
class CanceledException extends Exception
/**
* A function that is comparable to Future.sequence.
* Future.sequence processes in parallel and in case of failure it returns only the failure.
* This function processes sequentially (non-blocking) and stops at the first failure and returns that failure and
* all results up to that moment.
*/
object AsyncProcessing {
/** this exception is private to mapSequentially */
class WithResultsException[A, B](cause: Throwable, val results: Seq[B], val failingItem: A) extends Exception(cause)
implicit class SeqExtension[A](s: Seq[A]) {
/**
* Do a foldLeft asynchronously. The signature is like the standard foldLeft with the difference that function argument
* is asynchronous.
*/
def foldLeftAsync[B](initial: B)(f: (B, A) => Future[B])(implicit ec: ExecutionContext): Future[B] =
s.foldLeft(Future.successful(initial))((future, item) => future.flatMap(f(_, item)))
/**
*
* @param f the function to call for each item
* @param until processing will proceed until this function returns true
* @param progress this function will be called after each item that has been processed
* @param ec executionContext
* @tparam B the type of the result of function f
* @return a tuple of 1. the successful results, 2. the failure, if any, with the argument that resulted in the failure.
*/
def mapSequentially[B](f: A => Future[B], until: () => Boolean = () => false, progress: (Int, Try[B]) => Unit = (seqNr: Int, outcome: Try[B]) => ())(implicit ec: ExecutionContext): Future[(Seq[B], Option[(A, Throwable)])] = { // scalastyle:ignore
val none: Option[(A, Throwable)] = None
s.zipWithIndex.foldLeftAsync(Seq.empty[B]) {
case (seq, (item, i)) => {
val cancel = until()
val result = if (!cancel) f(item) else Future.failed(new CanceledException)
result.onComplete {
case s @ Success(_) => progress(i, s)
case f @ Failure(_) => progress(i, f)
}
result.map(seq :+ _).recoverWith {
case NonFatal(e) => Future.failed(new WithResultsException[A, B](e, seq, failingItem = item))
}
}
}
.map((_, none))
.recover {
case e: WithResultsException[A, B @unchecked] => (e.results, Some((e.failingItem, e.getCause)))
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment