-
-
Save iomonad/8f1f5b664955b267b39d4cf2c74ffadf to your computer and use it in GitHub Desktop.
Akka stream generic pass through flow. For latest implementation see https://developer.lightbend.com/docs/alpakka/current/patterns.html#passthrough
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
https://scalafiddle.io/sf/sniohcZ/1 | |
Use PassThroughFlow when you have a message that should be used in a | |
flow that trasform it but you want to maintain the original message for | |
another following flow. | |
For example if you consume messages from Kafka (CommittableMessage). | |
You process the message (transform, save it inside a database, ...) and then you need again the original message | |
to commit the offset. | |
a=>transform=>a1 | |
/ \ | |
/ \ | |
-> a=>(a, a)=>unzip- zip=>(a1, a)=> a | |
\ / | |
\ / | |
--------a-------- | |
Usage: | |
val source = Source(List(1, 2, 3)) | |
val printerFlow = | |
Flow[Int] | |
.map(println) | |
val plus10Flow = | |
Flow[Int] | |
.map(_ + 10) | |
source | |
.via(PassThroughFlow(printerFlow)) | |
.via(plus10Flow) | |
.runWith(Sink.foreach(println)) | |
*/ | |
import akka._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.actor._ | |
object PassThroughFlow { | |
def apply[A, T](processingFlow: Flow[A, T, NotUsed]): Graph[FlowShape[A, A], NotUsed] = { | |
Flow.fromGraph(GraphDSL.create() { | |
implicit builder => { | |
import GraphDSL.Implicits._ | |
val tuplerizer = builder.add(Flow[A].map(x => (x, x))) | |
val detuplerizer = builder.add(Flow[(T, A)].map(_._2)) | |
val unzip = builder.add(Unzip[A, A]()) | |
val zip = builder.add(Zip[T, A]()) | |
val passThroughFlow = Flow[A].map(identity) | |
tuplerizer ~> unzip.in | |
unzip.out0 ~> processingFlow ~> zip.in0 | |
unzip.out1 ~> passThroughFlow ~> zip.in1 | |
zip.out ~> detuplerizer | |
FlowShape(tuplerizer.in, detuplerizer.out) | |
} | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment