Last active
July 8, 2016 14:31
-
-
Save patriknw/65e94e0913db450fb0ea2da4c3e2d846 to your computer and use it in GitHub Desktop.
Blog: GraphStage emit and friends
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package blog | |
import akka.stream.Attributes | |
import akka.stream.FlowShape | |
import akka.stream.Inlet | |
import akka.stream.Outlet | |
import akka.stream.stage.GraphStage | |
import akka.stream.stage.GraphStageLogic | |
import akka.stream.stage.InHandler | |
import akka.stream.stage.OutHandler | |
class MaxStage extends GraphStage[FlowShape[Int, Int]] { | |
val in: Inlet[Int] = Inlet("MaxStage.in") | |
val out: Outlet[Int] = Outlet("MaxStage.out") | |
override val shape: FlowShape[Int, Int] = FlowShape(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) { | |
var maxValue = Int.MinValue | |
var maxPushed = Int.MinValue | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
maxValue = math.max(maxValue, grab(in)) | |
if (isAvailable(out) && maxValue > maxPushed) | |
pushMaxValue() | |
pull(in) | |
} | |
override def onUpstreamFinish(): Unit = { | |
if (maxValue > maxPushed) | |
pushMaxValue() | |
completeStage() | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
if (maxValue > maxPushed) | |
pushMaxValue() | |
else if (!hasBeenPulled(in)) | |
pull(in) | |
} | |
}) | |
def pushMaxValue(): Unit = { | |
maxPushed = maxValue | |
push(out, maxPushed) | |
} | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package blog | |
import akka.stream.Attributes | |
import akka.stream.FlowShape | |
import akka.stream.Inlet | |
import akka.stream.Outlet | |
import akka.stream.stage.GraphStage | |
import akka.stream.stage.GraphStageLogic | |
import akka.stream.stage.InHandler | |
import akka.stream.stage.OutHandler | |
class MaxStage2 extends GraphStage[FlowShape[Int, Int]] { | |
val in: Inlet[Int] = Inlet("MaxStage.in") | |
val out: Outlet[Int] = Outlet("MaxStage.out") | |
override val shape: FlowShape[Int, Int] = FlowShape(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) { | |
var maxValue = Int.MinValue | |
var maxPushed = Int.MinValue | |
var finishing = false | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
maxValue = math.max(maxValue, grab(in)) | |
if (isAvailable(out) && maxValue > maxPushed) | |
pushMaxValue() | |
pull(in) | |
} | |
override def onUpstreamFinish(): Unit = { | |
if (maxValue > maxPushed) { | |
if (isAvailable(out)) { | |
pushMaxValue() | |
completeStage() | |
} else { | |
// push final value and complete stage in onPull | |
finishing = true | |
} | |
} else { | |
completeStage() | |
} | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
if (maxValue > maxPushed) { | |
pushMaxValue() | |
if (finishing) | |
completeStage() | |
} else if (!hasBeenPulled(in)) | |
pull(in) | |
} | |
}) | |
def pushMaxValue(): Unit = { | |
maxPushed = maxValue | |
push(out, maxPushed) | |
} | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package blog | |
import akka.stream.Attributes | |
import akka.stream.FlowShape | |
import akka.stream.Inlet | |
import akka.stream.Outlet | |
import akka.stream.stage.GraphStage | |
import akka.stream.stage.GraphStageLogic | |
import akka.stream.stage.InHandler | |
import akka.stream.stage.OutHandler | |
class MaxStage3 extends GraphStage[FlowShape[Int, Int]] { | |
val in: Inlet[Int] = Inlet("MaxStage.in") | |
val out: Outlet[Int] = Outlet("MaxStage.out") | |
override val shape: FlowShape[Int, Int] = FlowShape(in, out) | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = | |
new GraphStageLogic(shape) { | |
var maxValue = Int.MinValue | |
var maxPushed = Int.MinValue | |
setHandler(in, new InHandler { | |
override def onPush(): Unit = { | |
maxValue = math.max(maxValue, grab(in)) | |
if (isAvailable(out) && maxValue > maxPushed) | |
pushMaxValue() | |
pull(in) | |
} | |
override def onUpstreamFinish(): Unit = { | |
if (maxValue > maxPushed) | |
emit(out, maxValue) | |
completeStage() | |
} | |
}) | |
setHandler(out, new OutHandler { | |
override def onPull(): Unit = { | |
if (maxValue > maxPushed) | |
pushMaxValue() | |
else if (!hasBeenPulled(in)) | |
pull(in) | |
} | |
}) | |
def pushMaxValue(): Unit = { | |
maxPushed = maxValue | |
push(out, maxPushed) | |
} | |
} | |
} | |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package blog | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
import akka.actor.ActorSystem | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.Keep | |
import akka.stream.testkit.scaladsl.TestSink | |
import akka.stream.testkit.scaladsl.TestSource | |
import org.scalatest.BeforeAndAfterAll | |
import org.scalatest.Matchers | |
import org.scalatest.WordSpec | |
class MaxStageSpec extends WordSpec with Matchers with BeforeAndAfterAll { | |
implicit val system = ActorSystem("MaxLimitSpec") | |
implicit val mat = ActorMaterializer() | |
"MaxStage" should { | |
"emit max value as known so far" in { | |
val (upstream, downstream) = | |
TestSource.probe[Int] | |
.via(new MaxStage3) | |
.toMat(TestSink.probe)(Keep.both) | |
.run() | |
// send element 10 from upstream | |
upstream.sendNext(10) | |
downstream.request(1) | |
// and it is received by downstream | |
downstream.expectNext(10) | |
downstream.request(1) | |
upstream.sendNext(9) | |
upstream.sendNext(8) | |
// no new max yet since 9 and 8 are < 10 | |
downstream.expectNoMsg(200.millis) | |
upstream.sendNext(11) | |
// new max emitted by the stage | |
downstream.expectNext(11) | |
upstream.sendNext(17) | |
// end the stream | |
upstream.sendComplete() | |
// no request from downstream yet | |
downstream.expectNoMsg(200.millis) | |
downstream.request(1) | |
// get the final element | |
downstream.expectNext(17) | |
downstream.expectComplete() | |
} | |
} | |
override protected def afterAll() = { | |
Await.ready(system.terminate(), 10.seconds) | |
super.afterAll() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment