Skip to content

Instantly share code, notes, and snippets.

@gabfssilva
Created April 6, 2019 18:30
Show Gist options
  • Save gabfssilva/4489d6961d9e3cb5ea5cc5ec922ce861 to your computer and use it in GitHub Desktop.
Save gabfssilva/4489d6961d9e3cb5ea5cc5ec922ce861 to your computer and use it in GitHub Desktop.
EitherFlow
package utils
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition}
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
object EitherFlow extends EitherFlow
trait EitherFlow {
/**
* Creates a Flow[Either[R, L], Result, NotUsed] that can choose between two flows based on the materialized value
*
* @param leftFlow : flow used for left values
* @param rightFlow : flow used for right values
* @return merge flow from either leftFlow or rightFlow
*/
def either[L, R, Result](leftFlow: Flow[L, Result, NotUsed],
rightFlow: Flow[R, Result, NotUsed]): Flow[Either[L, R], Result, NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val eitherIn = b.add(Partition[Either[L, R]](2, {
case _: Right[L, R] => 0
case _: Left[L, R] => 1
}))
val result = b.add(Merge[Result](2))
eitherIn.out(0).collect { case Right(v) => v } ~> rightFlow ~> result
eitherIn.out(1).collect { case Left(v) => v } ~> leftFlow ~> result
FlowShape(eitherIn.in, result.out)
})
/**
* Creates a Flow[Either[R, L], Result, NotUsed] that can choose between two flows based on the materialized value
*
* @param rightFlow : flow used for right values
* @return merge flow from either leftFlow or rightFlow
*/
def right[L, R, Result](rightFlow: Flow[R, Result, NotUsed]): Flow[Either[L, R], Either[L, Result], NotUsed] =
Flow.fromGraph(GraphDSL.create() { implicit b =>
import GraphDSL.Implicits._
val eitherIn = b.add(Partition[Either[L, R]](2, {
case _: Right[L, R] => 0
case _: Left[L, R] => 1
}))
val result = b.add(Merge[Either[L, Result]](2))
eitherIn.out(0).collect { case Right(v) => v } ~> rightFlow ~> Flow[Result].map(x => Right(x)) ~> result
eitherIn.out(1).collectType[Left[L, Result]] ~> result
FlowShape(eitherIn.in, result.out)
})
/**
* Creates a Flow[Either[R, L], Result, NotUsed] that can choose between two flows based on the materialized value
*
* @param rightFlow : flow used for right values
* @return merge flow from either leftFlow or rightFlow
*/
def leftRecover[L, R](leftFlow: Flow[L, R, NotUsed]): Flow[Either[L, R], R, NotUsed] =
EitherFlow.either(
leftFlow = leftFlow,
rightFlow = Flow[R].map(identity)
)
implicit class FlowEitherSameReturnImplicits[I, R](flow: Flow[I, Either[R, R], NotUsed]) {
def unwrapEither: Flow[I, R, NotUsed] = flow.map {
case Right(v) => v
case Left(v) => v
}
}
implicit class FlowEitherImplicits[I, L, R](flow: Flow[I, Either[L, R], NotUsed]) {
def viaEither[Result](left: Flow[L, Result, NotUsed],
right: Flow[R, Result, NotUsed]): Flow[I, Result, NotUsed] = flow.via(EitherFlow.either(left, right))
def viaRight[Result](right: Flow[R, Result, NotUsed]): Flow[I, Either[L, Result], NotUsed] = flow.via(EitherFlow.right(right))
def leftRecover(left: Flow[L, R, NotUsed]): Flow[I, R, NotUsed] = flow.via(EitherFlow.leftRecover(left))
def mapAsyncRight[Result](parallelism: Int)(f: R => Future[Result]): Flow[I, Either[L, Result], NotUsed] =
flow.mapAsync(parallelism) {
case Left(value) => Future.successful(Left(value))
case Right(value: R) => f(value).map(Right(_))
}
def mapRight[Result](f: R => Result): Flow[I, Either[L, Result], NotUsed] =
flow.map {
case r: Right[L, R] => r.map(f)
case x => x.asInstanceOf[Either[L, Result]]
}
def mapLeft[LeftResult](f: L => LeftResult): Flow[I, Either[LeftResult, R], NotUsed] =
flow.map {
case Left(v) => Left(f(v))
case r: Right[L, R] => r.asInstanceOf[Either[LeftResult, R]]
}
def mapAsyncUnorderedRight[Result](parallelism: Int)(f: R => Future[Result]): Flow[I, Either[L, Result], NotUsed] =
flow.mapAsyncUnordered(parallelism) {
case Left(value) => Future.successful(Left(value))
case Right(value: R) => f(value).map(Right(_))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment