Skip to content

Instantly share code, notes, and snippets.

@iomonad
Forked from davideicardi/PassThroughFlow.scala
Created November 5, 2018 14:39
Show Gist options
  • Save iomonad/8f1f5b664955b267b39d4cf2c74ffadf to your computer and use it in GitHub Desktop.
Save iomonad/8f1f5b664955b267b39d4cf2c74ffadf to your computer and use it in GitHub Desktop.
Akka stream generic pass through flow. For latest implementation see https://developer.lightbend.com/docs/alpakka/current/patterns.html#passthrough
/*
https://scalafiddle.io/sf/sniohcZ/1
Use PassThroughFlow when you have a message that should be used in a
flow that trasform it but you want to maintain the original message for
another following flow.
For example if you consume messages from Kafka (CommittableMessage).
You process the message (transform, save it inside a database, ...) and then you need again the original message
to commit the offset.
a=>transform=>a1
/ \
/ \
-> a=>(a, a)=>unzip- zip=>(a1, a)=> a
\ /
\ /
--------a--------
Usage:
val source = Source(List(1, 2, 3))
val printerFlow =
Flow[Int]
.map(println)
val plus10Flow =
Flow[Int]
.map(_ + 10)
source
.via(PassThroughFlow(printerFlow))
.via(plus10Flow)
.runWith(Sink.foreach(println))
*/
import akka._
import akka.stream._
import akka.stream.scaladsl._
import akka.actor._
object PassThroughFlow {
def apply[A, T](processingFlow: Flow[A, T, NotUsed]): Graph[FlowShape[A, A], NotUsed] = {
Flow.fromGraph(GraphDSL.create() {
implicit builder => {
import GraphDSL.Implicits._
val tuplerizer = builder.add(Flow[A].map(x => (x, x)))
val detuplerizer = builder.add(Flow[(T, A)].map(_._2))
val unzip = builder.add(Unzip[A, A]())
val zip = builder.add(Zip[T, A]())
val passThroughFlow = Flow[A].map(identity)
tuplerizer ~> unzip.in
unzip.out0 ~> processingFlow ~> zip.in0
unzip.out1 ~> passThroughFlow ~> zip.in1
zip.out ~> detuplerizer
FlowShape(tuplerizer.in, detuplerizer.out)
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment