Created
January 11, 2017 23:30
-
-
Save rkrzewski/f1d131405ddb8ce0d1a6fded55da8c23 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream._ | |
import akka.stream.stage._ | |
/** | |
* A simple fan-out stage that will split incoming stream according to the results of a decider function. | |
* Outgoing streams may carry elements of different types. | |
* | |
* '''Emits when''' emits when an element is available from the input and both downstreams have demand | |
* | |
* '''Backpressures when''' any of the downstreams back-pressures | |
* | |
* '''Completes when''' upstream completes | |
* | |
* '''Cancels when''' any of the downstreams cancel | |
* | |
*/ | |
class Splitter[I, O0, O1](decide: I => Either[O0, O1]) extends GraphStage[FanOutShape2[I, O0, O1]] { | |
val in: Inlet[I] = Inlet("in") | |
val out0: Outlet[O0] = Outlet("out0") | |
val out1: Outlet[O1] = Outlet("out1") | |
override val shape = new FanOutShape2(in, out0, out1) | |
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { | |
// number of available outlets | |
var ready: Int = 0 | |
// handers for both outlets are identical | |
val outHandler = new OutHandler { | |
override def onPull(): Unit = { | |
ready += 1 | |
if (ready == 2) | |
pull(in) | |
} | |
} | |
setHandler(out0, outHandler) | |
setHandler(out1, outHandler) | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
decide(grab(in)) match { | |
case Left(o0) => push(out0, o0) | |
case Right(o1) => push(out1, o1) | |
} | |
ready -= 1 | |
} | |
}) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment