Skip to content

Instantly share code, notes, and snippets.

@klpx
Last active June 18, 2019 16:25
Show Gist options
  • Save klpx/6b9b9e3a88ecbd1452da095223306505 to your computer and use it in GitHub Desktop.
Save klpx/6b9b9e3a88ecbd1452da095223306505 to your computer and use it in GitHub Desktop.
Akka Streams. Graph for Split Either[L,R] to L and R flows
/**
Copy left Alexander Hasselbach
Usage:
val E = b.add(splitEither[Throwable,Int])
val parsed = b.add(Flow[Either[Throwable,Int]])
val valids = b.add(Flow[Int])
val invalids = b.add(Flow[Throwable])
parsed ~> E.in
E.left ~> invalids
E.right ~> valids
*/
import akka.NotUsed
import akka.stream.Graph
import akka.stream.scaladsl.{Flow, GraphDSL, Broadcast}
def splitEither[L,R]: Graph[EitherFanOutShape[Either[L,R], L, R], NotUsed] =
GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val input = b.add(Flow[Either[L,R]])
val bcast = b.add(Broadcast[Either[L,R]](outputPorts = 2))
val leftOut = b.add(Flow[Either[L,R]].collect { case Left(l) => l })
val rightOut = b.add(Flow[Either[L,R]].collect { case Right(r) => r })
input ~> bcast ~> leftOut
bcast ~> rightOut
new EitherFanOutShape(input.in, leftOut.out, rightOut.out)
}
///
import akka.stream.{FanOutShape, Inlet, Outlet}
class EitherFanOutShape[In, L, R](_init: FanOutShape.Init[In]) extends FanOutShape[In](_init) {
def this(name: String) = this(FanOutShape.Name[In](name))
def this(in: Inlet[In], left: Outlet[L], right: Outlet[R]) = this(FanOutShape.Ports(in, left :: right :: Nil))
override protected def construct(init: FanOutShape.Init[In]): FanOutShape[In] = new EitherFanOutShape(init)
override def deepCopy(): EitherFanOutShape[In, L, R] = super.deepCopy().asInstanceOf[EitherFanOutShape[In, L, R]]
val left: Outlet[L] = newOutlet[L]("left")
val right: Outlet[R] = newOutlet[R]("right")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment