Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active August 29, 2015 14:13
Show Gist options
  • Save atamborrino/186db7fbabd0c44419d8 to your computer and use it in GitHub Desktop.
Save atamborrino/186db7fbabd0c44419d8 to your computer and use it in GitHub Desktop.
Akka stream helpers
object FlowHelper {
def mapAsyncUnorderedWithBoundedParallelism[A, B](parallelism: Int)(f: A => Future[B]): Flow[A, B] =
Flow[A].section(OperationAttributes.inputBuffer(initial = parallelism, max = parallelism)) { sectionFlow =>
sectionFlow.mapAsyncUnordered(f)
}
def mapAsyncWithBoundedParallelism[A, B](parallelism: Int)(f: A => Future[B]): Flow[A, B] =
Flow[A].section(OperationAttributes.inputBuffer(initial = parallelism, max = parallelism)) { sectionFlow =>
sectionFlow.mapAsync(f)
}
def mapAsyncWithOrderedSideEffect[A, B](f: A => Future[B]): Flow[A, B] = mapAsyncWithBoundedParallelism(1)(f)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment