Created
April 6, 2019 18:30
-
-
Save gabfssilva/4489d6961d9e3cb5ea5cc5ec922ce861 to your computer and use it in GitHub Desktop.
EitherFlow
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package utils | |
import akka.NotUsed | |
import akka.stream.FlowShape | |
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition} | |
import scala.concurrent.Future | |
import scala.concurrent.ExecutionContext.Implicits.global | |
object EitherFlow extends EitherFlow | |
trait EitherFlow { | |
/** | |
* Creates a Flow[Either[R, L], Result, NotUsed] that can choose between two flows based on the materialized value | |
* | |
* @param leftFlow : flow used for left values | |
* @param rightFlow : flow used for right values | |
* @return merge flow from either leftFlow or rightFlow | |
*/ | |
def either[L, R, Result](leftFlow: Flow[L, Result, NotUsed], | |
rightFlow: Flow[R, Result, NotUsed]): Flow[Either[L, R], Result, NotUsed] = | |
Flow.fromGraph(GraphDSL.create() { implicit b => | |
import GraphDSL.Implicits._ | |
val eitherIn = b.add(Partition[Either[L, R]](2, { | |
case _: Right[L, R] => 0 | |
case _: Left[L, R] => 1 | |
})) | |
val result = b.add(Merge[Result](2)) | |
eitherIn.out(0).collect { case Right(v) => v } ~> rightFlow ~> result | |
eitherIn.out(1).collect { case Left(v) => v } ~> leftFlow ~> result | |
FlowShape(eitherIn.in, result.out) | |
}) | |
/** | |
* Creates a Flow[Either[R, L], Result, NotUsed] that can choose between two flows based on the materialized value | |
* | |
* @param rightFlow : flow used for right values | |
* @return merge flow from either leftFlow or rightFlow | |
*/ | |
def right[L, R, Result](rightFlow: Flow[R, Result, NotUsed]): Flow[Either[L, R], Either[L, Result], NotUsed] = | |
Flow.fromGraph(GraphDSL.create() { implicit b => | |
import GraphDSL.Implicits._ | |
val eitherIn = b.add(Partition[Either[L, R]](2, { | |
case _: Right[L, R] => 0 | |
case _: Left[L, R] => 1 | |
})) | |
val result = b.add(Merge[Either[L, Result]](2)) | |
eitherIn.out(0).collect { case Right(v) => v } ~> rightFlow ~> Flow[Result].map(x => Right(x)) ~> result | |
eitherIn.out(1).collectType[Left[L, Result]] ~> result | |
FlowShape(eitherIn.in, result.out) | |
}) | |
/** | |
* Creates a Flow[Either[R, L], Result, NotUsed] that can choose between two flows based on the materialized value | |
* | |
* @param rightFlow : flow used for right values | |
* @return merge flow from either leftFlow or rightFlow | |
*/ | |
def leftRecover[L, R](leftFlow: Flow[L, R, NotUsed]): Flow[Either[L, R], R, NotUsed] = | |
EitherFlow.either( | |
leftFlow = leftFlow, | |
rightFlow = Flow[R].map(identity) | |
) | |
implicit class FlowEitherSameReturnImplicits[I, R](flow: Flow[I, Either[R, R], NotUsed]) { | |
def unwrapEither: Flow[I, R, NotUsed] = flow.map { | |
case Right(v) => v | |
case Left(v) => v | |
} | |
} | |
implicit class FlowEitherImplicits[I, L, R](flow: Flow[I, Either[L, R], NotUsed]) { | |
def viaEither[Result](left: Flow[L, Result, NotUsed], | |
right: Flow[R, Result, NotUsed]): Flow[I, Result, NotUsed] = flow.via(EitherFlow.either(left, right)) | |
def viaRight[Result](right: Flow[R, Result, NotUsed]): Flow[I, Either[L, Result], NotUsed] = flow.via(EitherFlow.right(right)) | |
def leftRecover(left: Flow[L, R, NotUsed]): Flow[I, R, NotUsed] = flow.via(EitherFlow.leftRecover(left)) | |
def mapAsyncRight[Result](parallelism: Int)(f: R => Future[Result]): Flow[I, Either[L, Result], NotUsed] = | |
flow.mapAsync(parallelism) { | |
case Left(value) => Future.successful(Left(value)) | |
case Right(value: R) => f(value).map(Right(_)) | |
} | |
def mapRight[Result](f: R => Result): Flow[I, Either[L, Result], NotUsed] = | |
flow.map { | |
case r: Right[L, R] => r.map(f) | |
case x => x.asInstanceOf[Either[L, Result]] | |
} | |
def mapLeft[LeftResult](f: L => LeftResult): Flow[I, Either[LeftResult, R], NotUsed] = | |
flow.map { | |
case Left(v) => Left(f(v)) | |
case r: Right[L, R] => r.asInstanceOf[Either[LeftResult, R]] | |
} | |
def mapAsyncUnorderedRight[Result](parallelism: Int)(f: R => Future[Result]): Flow[I, Either[L, Result], NotUsed] = | |
flow.mapAsyncUnordered(parallelism) { | |
case Left(value) => Future.successful(Left(value)) | |
case Right(value: R) => f(value).map(Right(_)) | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment