Skip to content

Instantly share code, notes, and snippets.

@calvinlfer
Last active March 30, 2019 13:23
Show Gist options
  • Save calvinlfer/cc4ea90328834a95a89ce99aeb998a63 to your computer and use it in GitHub Desktop.
Save calvinlfer/cc4ea90328834a95a89ce99aeb998a63 to your computer and use it in GitHub Desktop.
Akka Streams Flow that distributes messages (according to a hashing function) across sub-flows. The idea is to have ordered processing per sub-flow but parallel processing across sub-flows.
import akka.stream._
import akka.stream.scaladsl._
/***
* Example based on numBuckets = 3
* --- bucket 1 flow --- ~mapAsync(parallelism)~ ---
* |------------------| / \|---------------|
* Open inlet[A] --- | Partition Fan Out| --- bucket 2 flow --- ~mapAsync(parallelism)~ -----| Merge Fan In | --- Open outlet[B]
* |------------------| \ /|---------------|
* --- bucket 3 flow --- ~mapAsync(parallelism)~ ---
*
*
* @param numBuckets the number of sub-flows to create
* @param parallelism the mapAsync (ordered) parallelism per sub flow
* @param hash the hashing function used to decide
* @param fn is the mapping function to be used for mapAsync
* @tparam A is the input stream of elements of type A
* @tparam B is the output streams of elements of type B
* @return a Flow of elements from type A to type B
*/
def hashingDistribution[A, B](numBuckets: Int,
parallelism: Int,
hash: A => Int,
fn: A => Future[B]): Flow[A, B, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
val numPorts = numBuckets
val partitioner =
builder.add(Partition[A](outputPorts = numPorts, partitioner = a => math.abs(hash(a)) % numPorts))
val merger = builder.add(Merge[B](inputPorts = numPorts, eagerComplete = false))
Range(0, numPorts).foreach { eachPort =>
partitioner.out(eachPort) ~> Flow[A].mapAsync(parallelism)(fn) ~> merger.in(eachPort)
}
FlowShape(partitioner.in, merger.out)
})
@calvinlfer
Copy link
Author

Example usage:

implicit val system: ActorSystem =  ActorSystem("streaming-system", ConfigFactory.load("akka {}"))
implicit val mat: ActorMaterializer = ActorMaterializer()
implicit val ec: ExecutionContext = system.dispatcher

def sampleAsyncCall(x: Int): Future[Int] = Future {
  Thread.sleep((x * 100L) % 10)
  x
}

Source
  .repeat(1 to 10000)
  .mapConcat(identity)
  .via(
    hashingDistribution[Int, Int](
      numBuckets = 100,
      parallelism = 2,
      hash = element => MurmurHash3.stringHash(element.toString),
      fn = sampleAsyncCall
    )
  )
  .to(Sink.foreach(println))
  .run()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment