Last active
April 18, 2017 12:28
-
-
Save klpx/e39bc4f3edbc98ab7a9df43cda042d3a to your computer and use it in GitHub Desktop.
Akka Streams. Graph for merge L and R flows into Either[L,R] flow
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
Copy left Alexander Hasselbach | |
Usage: | |
val E = b.add(mergeEither[Throwable,Int]) | |
val parsed = b.add(Flow[Either[Throwable,Int]]) | |
val valids = b.add(Flow[Int]) | |
val invalids = b.add(Flow[Throwable]) | |
valids ~> E.right | |
invalids ~> E.left | |
E.out ~> parsed | |
*/ | |
import akka.NotUsed | |
import akka.stream.Graph | |
import akka.stream.scaladsl.{Flow, GraphDSL, Merge} | |
def mergeEither[L,R]: Graph[EitherFanInShape[L, R, Either[L,R]], NotUsed] = | |
GraphDSL.create() { implicit b => | |
import GraphDSL.Implicits._ | |
val leftIn = b.add(Flow[L].map(Left.apply[L, R])) | |
val rightIn = b.add(Flow[R].map(Right.apply[L, R])) | |
val output = b.add(Merge[Either[L,R]](inputPorts = 2)) | |
leftIn ~> output | |
rightIn ~> output | |
new EitherFanInShape(leftIn.in, rightIn.in, output.out) | |
} | |
/// | |
import akka.stream.{FanInShape, Inlet, Outlet} | |
class EitherFanInShape[L, R, Out](_init: FanInShape.Init[Out]) extends FanInShape[Out](_init) { | |
def this(name: String) = this(FanInShape.Name[Out](name)) | |
def this(left: Inlet[L], right: Inlet[R], out: Outlet[Out]) = this(FanInShape.Ports(out, left :: right :: Nil)) | |
override protected def construct(init: FanInShape.Init[Out]): FanInShape[Out] = new EitherFanInShape(init) | |
override def deepCopy(): EitherFanInShape[L, R, Out] = super.deepCopy().asInstanceOf[EitherFanInShape[L, R, Out]] | |
val left: Inlet[L] = newInlet[L]("left") | |
val right: Inlet[R] = newInlet[R]("right") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment