Skip to content

Instantly share code, notes, and snippets.

@davideicardi
Last active June 6, 2022 08:32
Show Gist options
  • Save davideicardi/d3b383e5945a44252931582f83ecadc2 to your computer and use it in GitHub Desktop.
Save davideicardi/d3b383e5945a44252931582f83ecadc2 to your computer and use it in GitHub Desktop.
Akka stream generic pass through flow. For latest implementation see https://developer.lightbend.com/docs/alpakka/current/patterns.html#passthrough
/*
https://scalafiddle.io/sf/sniohcZ/3
(old version: https://scalafiddle.io/sf/sniohcZ/1)
Use PassThroughFlow when you have a message that should be used in a
flow that trasform it but you want to maintain the original message for
another following flow.
For example if you consume messages from Kafka (CommittableMessage).
You process the message (transform, save it inside a database, ...) and then you need again the original message
to commit the offset.
a=>transform=>a1
/ \
/ \
-> a=>(a, a)=>unzip- zip=>(a1, a)=> a
\ /
\ /
--------a--------
Usage:
val source = Source(List(1, 2, 3))
val printerFlow =
Flow[Int]
.map(println)
val plus10Flow =
Flow[Int]
.map(_ + 10)
source
.via(PassThroughFlow(printerFlow, Keep.right))
.via(plus10Flow)
.runWith(Sink.foreach(println))
*/
import akka._
import akka.stream._
import akka.stream.scaladsl._
import akka.actor._
object PassThroughFlow {
def apply[A, T](processingFlow: Flow[A, T, NotUsed]): Graph[FlowShape[A, (T, A)], NotUsed] =
apply[A, T, (T, A)](processingFlow, Keep.both)
def apply[A, T, O](processingFlow: Flow[A, T, NotUsed], output: (T, A) => O): Graph[FlowShape[A, O], NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit builder =>
{
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[A](2))
val zip = builder.add(ZipWith[T, A, O]((left, right) => output(left, right)))
// format: off
broadcast.out(0) ~> processingFlow ~> zip.in0
broadcast.out(1) ~> zip.in1
// format: on
FlowShape(broadcast.in, zip.out)
}
})
}
import akka.actor.ActorSystem
import akka.stream.scaladsl._
import akka.testkit.TestKit
import org.scalatest.BeforeAndAfterAll
import org.scalatest.funspec.AsyncFunSpecLike
import org.scalatest.matchers.should.Matchers
class PassThroughFlowSpec extends TestKit(ActorSystem("AdapterControlGraphSpec")) with AsyncFunSpecLike with BeforeAndAfterAll with Matchers {
override protected def afterAll(): Unit = TestKit.shutdownActorSystem(system)
describe("PassThroughFlow") {
it(" original message is maintained ") {
//Sample Source
val source = Source(List(1, 2, 3))
// Pass through this flow maintaining the original message
val passThroughMe =
Flow[Int]
.map(_ * 10)
val retFuture = source
.via(PassThroughFlow(passThroughMe, Keep.right))
.runWith(Sink.seq)
//Verify results
retFuture.map { ret =>
ret should be(Vector(1, 2, 3))
}
}
it(" original message and pass through flow output are returned ") {
//Sample Source
val source = Source(List(1, 2, 3))
// Pass through this flow maintaining the original message
val passThroughMe =
Flow[Int]
.map(_ * 10)
val retFut = source
.via(PassThroughFlow(passThroughMe))
.runWith(Sink.seq)
//Verify results
retFut.map { ret =>
ret should be(Vector((10, 1), (20, 2), (30, 3)))
}
}
}
}
@stikkos
Copy link

stikkos commented Jul 29, 2019

Nice, this is the Java equivalent:

static <I, O, M>  Graph<FlowShape<I, M>, NotUsed> passThrough(Flow<I, O, NotUsed> sourceFlow, Function2<O, I, M> output){
        return GraphDSL.create(
                        builder -> {

                            final UniformFanOutShape<I, I> broadcast = builder.add(Broadcast.create(2));

                            final FanInShape2<O, I, M> zip =
                                    builder.add(ZipWith.create(output));

                            builder.from(broadcast.out(0))
                                   .via(builder.add(sourceFlow))
                                   .toInlet(zip.in0());

                            builder.from(broadcast.out(1))
                                   .via(builder.add(Flow.fromFunction(i -> i)))
                                   .toInlet(zip.in1());

                            return new FlowShape<>(broadcast.in(), zip.out());
                        });
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment