Last active
June 6, 2022 08:32
-
-
Save davideicardi/d3b383e5945a44252931582f83ecadc2 to your computer and use it in GitHub Desktop.
Akka stream generic pass through flow. For latest implementation see https://developer.lightbend.com/docs/alpakka/current/patterns.html#passthrough
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
https://scalafiddle.io/sf/sniohcZ/3 | |
(old version: https://scalafiddle.io/sf/sniohcZ/1) | |
Use PassThroughFlow when you have a message that should be used in a | |
flow that trasform it but you want to maintain the original message for | |
another following flow. | |
For example if you consume messages from Kafka (CommittableMessage). | |
You process the message (transform, save it inside a database, ...) and then you need again the original message | |
to commit the offset. | |
a=>transform=>a1 | |
/ \ | |
/ \ | |
-> a=>(a, a)=>unzip- zip=>(a1, a)=> a | |
\ / | |
\ / | |
--------a-------- | |
Usage: | |
val source = Source(List(1, 2, 3)) | |
val printerFlow = | |
Flow[Int] | |
.map(println) | |
val plus10Flow = | |
Flow[Int] | |
.map(_ + 10) | |
source | |
.via(PassThroughFlow(printerFlow, Keep.right)) | |
.via(plus10Flow) | |
.runWith(Sink.foreach(println)) | |
*/ | |
import akka._ | |
import akka.stream._ | |
import akka.stream.scaladsl._ | |
import akka.actor._ | |
object PassThroughFlow { | |
def apply[A, T](processingFlow: Flow[A, T, NotUsed]): Graph[FlowShape[A, (T, A)], NotUsed] = | |
apply[A, T, (T, A)](processingFlow, Keep.both) | |
def apply[A, T, O](processingFlow: Flow[A, T, NotUsed], output: (T, A) => O): Graph[FlowShape[A, O], NotUsed] = | |
Flow.fromGraph(GraphDSL.create() { implicit builder => | |
{ | |
import GraphDSL.Implicits._ | |
val broadcast = builder.add(Broadcast[A](2)) | |
val zip = builder.add(ZipWith[T, A, O]((left, right) => output(left, right))) | |
// format: off | |
broadcast.out(0) ~> processingFlow ~> zip.in0 | |
broadcast.out(1) ~> zip.in1 | |
// format: on | |
FlowShape(broadcast.in, zip.out) | |
} | |
}) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.stream.scaladsl._ | |
import akka.testkit.TestKit | |
import org.scalatest.BeforeAndAfterAll | |
import org.scalatest.funspec.AsyncFunSpecLike | |
import org.scalatest.matchers.should.Matchers | |
class PassThroughFlowSpec extends TestKit(ActorSystem("AdapterControlGraphSpec")) with AsyncFunSpecLike with BeforeAndAfterAll with Matchers { | |
override protected def afterAll(): Unit = TestKit.shutdownActorSystem(system) | |
describe("PassThroughFlow") { | |
it(" original message is maintained ") { | |
//Sample Source | |
val source = Source(List(1, 2, 3)) | |
// Pass through this flow maintaining the original message | |
val passThroughMe = | |
Flow[Int] | |
.map(_ * 10) | |
val retFuture = source | |
.via(PassThroughFlow(passThroughMe, Keep.right)) | |
.runWith(Sink.seq) | |
//Verify results | |
retFuture.map { ret => | |
ret should be(Vector(1, 2, 3)) | |
} | |
} | |
it(" original message and pass through flow output are returned ") { | |
//Sample Source | |
val source = Source(List(1, 2, 3)) | |
// Pass through this flow maintaining the original message | |
val passThroughMe = | |
Flow[Int] | |
.map(_ * 10) | |
val retFut = source | |
.via(PassThroughFlow(passThroughMe)) | |
.runWith(Sink.seq) | |
//Verify results | |
retFut.map { ret => | |
ret should be(Vector((10, 1), (20, 2), (30, 3))) | |
} | |
} | |
} | |
} |
@LukeDefeo Yes, it is correct. Order is maintained. I have also updated a the example with a more simple version. I have added an async to increase throughput.
Nice, this is the Java equivalent:
static <I, O, M> Graph<FlowShape<I, M>, NotUsed> passThrough(Flow<I, O, NotUsed> sourceFlow, Function2<O, I, M> output){
return GraphDSL.create(
builder -> {
final UniformFanOutShape<I, I> broadcast = builder.add(Broadcast.create(2));
final FanInShape2<O, I, M> zip =
builder.add(ZipWith.create(output));
builder.from(broadcast.out(0))
.via(builder.add(sourceFlow))
.toInlet(zip.in0());
builder.from(broadcast.out(1))
.via(builder.add(Flow.fromFunction(i -> i)))
.toInlet(zip.in1());
return new FlowShape<>(broadcast.in(), zip.out());
});
}
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
thanks! am i right in assuming that the zipper will take elements from each side in order and non async?