Last active
June 18, 2019 16:25
-
-
Save klpx/6b9b9e3a88ecbd1452da095223306505 to your computer and use it in GitHub Desktop.
Akka Streams. Graph for Split Either[L,R] to L and R flows
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
Copy left Alexander Hasselbach | |
Usage: | |
val E = b.add(splitEither[Throwable,Int]) | |
val parsed = b.add(Flow[Either[Throwable,Int]]) | |
val valids = b.add(Flow[Int]) | |
val invalids = b.add(Flow[Throwable]) | |
parsed ~> E.in | |
E.left ~> invalids | |
E.right ~> valids | |
*/ | |
import akka.NotUsed | |
import akka.stream.Graph | |
import akka.stream.scaladsl.{Flow, GraphDSL, Broadcast} | |
def splitEither[L,R]: Graph[EitherFanOutShape[Either[L,R], L, R], NotUsed] = | |
GraphDSL.create() { implicit b => | |
import GraphDSL.Implicits._ | |
val input = b.add(Flow[Either[L,R]]) | |
val bcast = b.add(Broadcast[Either[L,R]](outputPorts = 2)) | |
val leftOut = b.add(Flow[Either[L,R]].collect { case Left(l) => l }) | |
val rightOut = b.add(Flow[Either[L,R]].collect { case Right(r) => r }) | |
input ~> bcast ~> leftOut | |
bcast ~> rightOut | |
new EitherFanOutShape(input.in, leftOut.out, rightOut.out) | |
} | |
/// | |
import akka.stream.{FanOutShape, Inlet, Outlet} | |
class EitherFanOutShape[In, L, R](_init: FanOutShape.Init[In]) extends FanOutShape[In](_init) { | |
def this(name: String) = this(FanOutShape.Name[In](name)) | |
def this(in: Inlet[In], left: Outlet[L], right: Outlet[R]) = this(FanOutShape.Ports(in, left :: right :: Nil)) | |
override protected def construct(init: FanOutShape.Init[In]): FanOutShape[In] = new EitherFanOutShape(init) | |
override def deepCopy(): EitherFanOutShape[In, L, R] = super.deepCopy().asInstanceOf[EitherFanOutShape[In, L, R]] | |
val left: Outlet[L] = newOutlet[L]("left") | |
val right: Outlet[R] = newOutlet[R]("right") | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment