Created
November 18, 2015 16:09
-
-
Save rkuhn/e3c7e5ca71906519e3f7 to your computer and use it in GitHub Desktop.
MergeSorted
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class MergeSorted[T: Ordering] extends GraphStage[FanInShape2[T, T, T]] { | |
private val left = Inlet[T]("left") | |
private val right = Inlet[T]("right") | |
private val out = Outlet[T]("out") | |
override val shape = new FanInShape2(left, right, out) | |
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { | |
import Ordering.Implicits._ | |
setHandler(left, ignoreTerminateInput) | |
setHandler(right, ignoreTerminateInput) | |
setHandler(out, eagerTerminateOutput) | |
var other: T = _ | |
def nullOut(): Unit = other = null.asInstanceOf[T] | |
def dispatch(l: T, r: T): Unit = | |
if (l < r) { other = r; emit(out, l, readL) } | |
else { other = l; emit(out, r, readR) } | |
val dispatchR = dispatch(other, _: T) | |
val dispatchL = dispatch(_: T, other) | |
val passR = () => emit(out, other, () => { nullOut(); passAlong(right, out, doPull = true) }) | |
val passL = () => emit(out, other, () => { nullOut(); passAlong(left, out, doPull = true) }) | |
val readR = () => read(right)(dispatchR, passL) | |
val readL = () => read(left)(dispatchL, passR) | |
override def preStart(): Unit = | |
read(left)(l => { | |
other = l | |
readR() | |
}, () => passAlong(right, out, doPull = true)) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala | |
index 7d62676..01a7516 100644 | |
--- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala | |
+++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala | |
@@ -428,12 +428,19 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: | |
* for the given inlet if suspension is needed and reinstalls the current | |
* handler upon receiving the last `onPush()` signal (before invoking the `andThen` function). | |
*/ | |
- final protected def readN[T](in: Inlet[T], n: Int)(andThen: Seq[T] ⇒ Unit): Unit = | |
+ final protected def readN[T](in: Inlet[T], n: Int)(andThen: Seq[T] ⇒ Unit, onClose: Seq[T] => Unit): Unit = | |
if (n < 0) throw new IllegalArgumentException("cannot read negative number of elements") | |
else if (n == 0) andThen(Nil) | |
else { | |
val result = new ArrayBuffer[T](n) | |
var pos = 0 | |
+ def realAndThen = (elem: T) => { | |
+ result(pos) = elem | |
+ pos += 1 | |
+ if (pos == n) andThen(result) | |
+ } | |
+ def realOnClose = () => onClose(result.take(pos)) | |
+ | |
if (isAvailable(in)) { | |
val elem = grab(in) | |
result(0) = elem | |
@@ -443,20 +450,12 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: | |
pos = 1 | |
requireNotReading(in) | |
pull(in) | |
- setHandler(in, new Reading(in, n - 1, getHandler(in))(elem ⇒ { | |
- result(pos) = elem | |
- pos += 1 | |
- if (pos == n) andThen(result) | |
- })) | |
+ setHandler(in, new Reading(in, n - 1, getHandler(in))(realAndThen, realOnClose)) | |
} | |
} else { | |
requireNotReading(in) | |
if (!hasBeenPulled(in)) pull(in) | |
- setHandler(in, new Reading(in, n, getHandler(in))(elem ⇒ { | |
- result(pos) = elem | |
- pos += 1 | |
- if (pos == n) andThen(result) | |
- })) | |
+ setHandler(in, new Reading(in, n, getHandler(in))(realAndThen, realOnClose)) | |
} | |
} | |
@@ -466,14 +465,16 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: | |
* for the given inlet if suspension is needed and reinstalls the current | |
* handler upon receiving the `onPush()` signal (before invoking the `andThen` function). | |
*/ | |
- final protected def read[T](in: Inlet[T])(andThen: T ⇒ Unit): Unit = { | |
+ final protected def read[T](in: Inlet[T])(andThen: T ⇒ Unit, onClose: () => Unit): Unit = { | |
if (isAvailable(in)) { | |
val elem = grab(in) | |
andThen(elem) | |
+ } else if (isClosed(in)) { | |
+ onClose() | |
} else { | |
requireNotReading(in) | |
if (!hasBeenPulled(in)) pull(in) | |
- setHandler(in, new Reading(in, 1, getHandler(in))(andThen)) | |
+ setHandler(in, new Reading(in, 1, getHandler(in))(andThen, onClose)) | |
} | |
} | |
@@ -497,7 +498,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: | |
* Caution: for n==1 andThen is called after resetting the handler, for | |
* other values it is called without resetting the handler. | |
*/ | |
- private class Reading[T](in: Inlet[T], private var n: Int, val previous: InHandler)(andThen: T ⇒ Unit) extends InHandler { | |
+ private class Reading[T](in: Inlet[T], private var n: Int, val previous: InHandler)(andThen: T ⇒ Unit, onClose: () => Unit) extends InHandler { | |
override def onPush(): Unit = { | |
val elem = grab(in) | |
if (n == 1) setHandler(in, previous) | |
@@ -507,8 +508,15 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: | |
} | |
andThen(elem) | |
} | |
- override def onUpstreamFinish(): Unit = previous.onUpstreamFinish() | |
- override def onUpstreamFailure(ex: Throwable): Unit = previous.onUpstreamFailure(ex) | |
+ override def onUpstreamFinish(): Unit = { | |
+ setHandler(in, previous) | |
+ onClose() | |
+ previous.onUpstreamFinish() | |
+ } | |
+ override def onUpstreamFailure(ex: Throwable): Unit = { | |
+ setHandler(in, previous) | |
+ previous.onUpstreamFailure(ex) | |
+ } | |
} | |
/** | |
@@ -648,7 +656,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: | |
} | |
private class EmittingSingle[T](_out: Outlet[T], elem: T, _previous: OutHandler, _andThen: () ⇒ Unit) | |
- extends Emitting(_out, _previous, _andThen) { | |
+ extends Emitting(_out, _previous, _andThen) { | |
override def onPull(): Unit = { | |
push(out, elem) | |
@@ -657,7 +665,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: | |
} | |
private class EmittingIterator[T](_out: Outlet[T], elems: Iterator[T], _previous: OutHandler, _andThen: () ⇒ Unit) | |
- extends Emitting(_out, _previous, _andThen) { | |
+ extends Emitting(_out, _previous, _andThen) { | |
override def onPull(): Unit = { | |
push(out, elems.next()) | |
@@ -676,7 +684,9 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: | |
* given outlet before pulling for more data. `doTerminate` controls whether | |
* completion or failure of the given inlet shall lead to stage termination or not. | |
*/ | |
- final protected def passAlong[Out, In <: Out](from: Inlet[In], to: Outlet[Out], doFinish: Boolean, doFail: Boolean): Unit = | |
+ final protected def passAlong[Out, In <: Out](from: Inlet[In], to: Outlet[Out], | |
+ doFinish: Boolean = true, doFail: Boolean = true, | |
+ doPull: Boolean = false): Unit = { | |
setHandler(from, new InHandler { | |
val puller = () ⇒ tryPull(from) | |
override def onPush(): Unit = { | |
@@ -686,6 +696,8 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount: | |
override def onUpstreamFinish(): Unit = if (doFinish) super.onUpstreamFinish() | |
override def onUpstreamFailure(ex: Throwable): Unit = if (doFail) super.onUpstreamFailure(ex) | |
}) | |
+ if (doPull) tryPull(from) | |
+ } | |
/** | |
* Obtain a callback object that can be used asynchronously to re-enter the |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The patch is a minor touch-up over 2.0-M1 that allows
read
to register anonClose
callback.