Last active
January 12, 2017 10:55
-
-
Save rkrzewski/a0fc5d0b47d9a3e0b2c81435adef3fe7 to your computer and use it in GitHub Desktop.
Splitter variant that handles demand an termination of the outlets individually.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.stream._ | |
import akka.stream.stage._ | |
/** | |
* Fan-out the stream of `scala.util.Either[L, R]` elements to `L` and `R` streams. | |
* | |
* '''Emits when''' emits when an element is available from the input and the chosen output has demand | |
* | |
* '''Backpressures when''' the currently chosen output back-pressures | |
* | |
* '''Completes when''' upstream completes and no output is pending | |
* | |
* '''Cancels when''' when both downstreams cancel | |
*/ | |
class Splitter[L, R] extends GraphStage[FanOutShape2[Either[L, R], L, R]] { | |
val in: Inlet[Either[L, R]] = Inlet("in") | |
val left: Outlet[L] = Outlet("left") | |
val right: Outlet[R] = Outlet("right") | |
override val shape = new FanOutShape2(in, left, right) | |
def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { | |
var pendingElem: Any = null | |
var pendingOutlet: Outlet[_] = null | |
var downstreamRunning = 2 | |
class SplitterInHandler extends InHandler { | |
def maybePull(): Unit = { | |
if (isAvailable(left) || isAvailable(right)) { | |
pull(in) | |
} | |
} | |
def handlePush[T](elem: T, outlet: Outlet[T]): Unit = { | |
if (!isClosed(outlet)) { | |
if (isAvailable(outlet)) { | |
push(outlet, elem) | |
maybePull() | |
} else { | |
pendingElem = elem | |
pendingOutlet = outlet | |
} | |
} else { | |
maybePull() | |
} | |
} | |
override def onPush: Unit = { | |
grab(in) match { | |
case Left(elem) => handlePush(elem, left) | |
case Right(elem) => handlePush(elem, right) | |
} | |
} | |
override def onUpstreamFinish(): Unit = { | |
if (pendingElem == null) { | |
completeStage() | |
} | |
} | |
} | |
class SplitterOutHandler[T](outlet: Outlet[T]) extends OutHandler { | |
override def onPull: Unit = { | |
if (pendingElem != null && pendingOutlet == outlet) { | |
push(outlet, pendingElem.asInstanceOf[T]) | |
pendingElem = null | |
if (!isClosed(in)) { | |
if (!hasBeenPulled(in)) { | |
pull(in) | |
} | |
} else { | |
completeStage() | |
} | |
} else { | |
if (!hasBeenPulled(in)) { | |
pull(in) | |
} | |
} | |
} | |
override def onDownstreamFinish(): Unit = { | |
downstreamRunning -= 1 | |
if (downstreamRunning == 0) { | |
completeStage() | |
} else if (pendingElem != null && pendingOutlet == outlet) { | |
pendingElem = null | |
if (!hasBeenPulled(in)) { | |
pull(in) | |
} | |
} | |
} | |
} | |
setHandler(in, new SplitterInHandler) | |
setHandler(left, new SplitterOutHandler(left)) | |
setHandler(right, new SplitterOutHandler(right)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment