Skip to content

Instantly share code, notes, and snippets.

@erangaeb
Created February 3, 2019 02:25
Show Gist options
  • Save erangaeb/ba67d64d442d95637cc9e6c549366f23 to your computer and use it in GitHub Desktop.
Save erangaeb/ba67d64d442d95637cc9e6c549366f23 to your computer and use it in GitHub Desktop.
broadcast akka streams
def broadcast() = {
// source with list, list element contains[(String, Int)]
val langs: List[(String, Int)] = List(("Scala", 5), ("Golang", 8), ("Haskell", 7), ("Erlang", 5))
val source = Source(langs)
// two flows
// 1. to extract name from list element[(String, Int)]
// 2. to extract rate from list element[(String, Int)]
val nameFlow = Flow[(String, Int)].map(_._1)
val rateFlow = Flow[(String, Int)].map(_._2)
// two sinks
// 1. print name[String]
// 2. print rate[Int]
val nameSink = Sink.foreach[String](p => println(s"Name - $p"))
val rateSink = Sink.foreach[Int](p => println(s"Rate - $p"))
// graph
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
// broadcaster to broadcast list elements[(String, Int)] with two streams
val broadcaster = b.add(Broadcast[(String, Int)](2))
// source directs to broadcaster
source ~> broadcaster.in
// broadcast list element with two streams
// 1. to name sink
// 2. to rate sink
broadcaster.out(0) ~> nameFlow ~> nameSink
broadcaster.out(1) ~> rateFlow ~> rateSink
ClosedShape
})
graph.run()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment