Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active July 20, 2016 12:49
Show Gist options
  • Save atamborrino/01784819e572c3c515a8ff9bd54fd799 to your computer and use it in GitHub Desktop.
Save atamborrino/01784819e572c3c515a8ff9bd54fd799 to your computer and use it in GitHub Desktop.
Akka Stream GroupBy: we want to achieve ordered processing per substream + paralell processing between substreams
/// Working version
val fixedThreadPool = ExecutionContext.fromExecutorService(
Executors.newFixedThreadPool(10)
)
case class KafkaRecord(partition: Int, value: Int)
val records = (1 until 1000).map(i => KafkaRecord(i % 10, i))
val flow = Flow[KafkaRecord].mapAsync(1) { i =>
Future {
println(s"Processing $i")
Thread.sleep(1000)
}(fixedThreadPool)
}
.withAttributes(Attributes.inputBuffer(initial = 16, max = 16))
.async
val futDone = Source(records)
.groupBy(10, _.partition)
.via(flow)
.mergeSubstreams
.runWith(Sink.ignore)
Await.result(futDone, 2 minutes)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment