Created
June 13, 2018 15:22
-
-
Save patriknw/400f02cfdb5f03e66ca3c9c2f226390a to your computer and use it in GitHub Desktop.
Prototype LinearStage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Copyright (C) 2018 Lightbend Inc. <http://www.lightbend.com> | |
*/ | |
package akka.stream.stage | |
import scala.collection.immutable | |
import scala.concurrent.duration.FiniteDuration | |
import akka.NotUsed | |
import akka.annotation.InternalApi | |
import akka.event.LoggingAdapter | |
import akka.stream.Attributes | |
import akka.stream.FlowShape | |
import akka.stream.Graph | |
import akka.stream.Inlet | |
import akka.stream.Outlet | |
import akka.stream.impl.Buffer | |
object LinearStage { | |
def flow[In, Out, M](factory: () => LinearStageLogic[In, Out, M]): Graph[FlowShape[In, Out], M] = | |
new LinearGraphStageImpl(factory) | |
} | |
abstract class LinearStageLogic[In, Out, M](shape: FlowShape[In, Out]) | |
extends TimerGraphStageLogic(shape) | |
with InHandler with OutHandler { | |
// it's ok to create a new FlowShape(in, Out) here because constructor in GraphStageLogic is only using | |
// the size of inlets and outlets. The actual in and out are set via `internalSetInOut` | |
def this() = this(FlowShape(Inlet[In]("in"), Outlet[Out]("out"))) | |
private var in: Inlet[In] = _ | |
private var out: Outlet[Out] = _ | |
// Could use GraphStageLogic.emit instead of own buffer, but I think there should be a limit | |
private var outBuffer: Buffer[Out] = _ | |
def internalSetInOut(i: Inlet[In], o: Outlet[Out]): Unit = { | |
in = i | |
out = o | |
} | |
/** | |
* Called when the input port has a new element available. The actual element can be retrieved via the | |
* [[GraphStageLogic.grab()]] method. | |
*/ | |
@throws(classOf[Exception]) | |
def onPush(elem: In): Unit | |
def outBufferSize: Int | |
def materializedValue: M | |
override def preStart(): Unit = { | |
outBuffer = Buffer(outBufferSize, materializer) | |
} | |
override def onPush(): Unit = { | |
onPush(grab(in)) | |
if (isAvailable(out)) | |
tryPull(in) // in case it was filtered, and not emitting anything | |
} | |
override def onPull(): Unit = { | |
if (outBuffer.isEmpty) { | |
if (isClosed(in)) | |
completeStage() | |
else | |
tryPull(in) | |
} else | |
push(out, outBuffer.dequeue()) | |
} | |
final def emit(elem: Out): Unit = { | |
if (outBuffer.isEmpty) | |
push(out, elem) | |
else | |
outBuffer.enqueue(elem) | |
// FIXME outBuffer.isFull | |
// FIXME if (isClosed(out)) | |
} | |
final def emitMultiple(elems: immutable.Iterable[Out]): Unit = { | |
if (elems.nonEmpty) { | |
val iter = elems.iterator | |
if (outBuffer.isEmpty) | |
push(out, iter.next()) | |
while (iter.hasNext) | |
outBuffer.enqueue(iter.next()) | |
// FIXME outBuffer.isFull | |
// FIXME if (isClosed(out)) | |
} | |
} | |
def isOutBufferFull: Boolean = | |
outBuffer.isFull | |
override def postStop(): Unit = { | |
outBuffer.clear() | |
super.postStop() | |
} | |
override def onUpstreamFinish(): Unit = { | |
if (outBuffer.isEmpty) | |
super.onUpstreamFinish() | |
} | |
override def onUpstreamFailure(ex: Throwable): Unit = { | |
if (outBuffer.isEmpty) | |
super.onUpstreamFailure(ex) | |
} | |
} | |
/** | |
* INTERNAL API | |
*/ | |
@InternalApi private[akka] class LinearGraphStageImpl[In, Out, M](factory: () => LinearStageLogic[In, Out, M]) | |
extends GraphStageWithMaterializedValue[FlowShape[In, Out], M] { | |
override val shape = FlowShape(Inlet[In]("in"), Outlet[Out]("out")) | |
override def initialAttributes: Attributes = Attributes.name("LinearStage") // FIXME use DefaultAttributes | |
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, M) = { | |
val logic = factory() | |
logic.internalSetInOut(shape.in, shape.out) | |
(logic, logic.materializedValue) | |
} | |
} | |
object UsageDemo { | |
class Map[A, B](f: A ⇒ B) extends LinearStageLogic[A, B, NotUsed] { | |
override def onPush(elem: A): Unit = { | |
emit(f(elem)) | |
} | |
override def outBufferSize: Int = 0 | |
override def materializedValue: NotUsed = NotUsed | |
} | |
class Filter[A](p: A ⇒ Boolean) extends LinearStageLogic[A, A, NotUsed] { | |
override def onPush(elem: A): Unit = { | |
if (p(elem)) | |
emit(elem) | |
} | |
override def outBufferSize: Int = 0 | |
override def materializedValue: NotUsed = NotUsed | |
} | |
class Duplicator[A] extends LinearStageLogic[A, A, NotUsed] { | |
override def onPush(elem: A): Unit = { | |
emitMultiple(List(elem, elem)) | |
} | |
override def outBufferSize: Int = 1 | |
override def materializedValue: NotUsed = NotUsed | |
} | |
// each time an event is pushed through it will trigger a period of silence | |
class TimedGate[A](silencePeriod: FiniteDuration) extends LinearStageLogic[A, A, NotUsed] { | |
var open = false | |
override def onPush(elem: A): Unit = { | |
if (!open) { | |
emit(elem) | |
open = true | |
scheduleOnce(None, silencePeriod) | |
} | |
} | |
override def onTimer(timerKey: Any): Unit = { | |
open = false | |
} | |
override def materializedValue: NotUsed = NotUsed | |
override def outBufferSize: Int = 0 | |
} | |
//https://akka.io/blog/2016/10/21/emit-and-friends | |
class Max extends LinearStageLogic[Int, Int, NotUsed] { | |
var maxValue = Int.MinValue | |
var maxPushed = Int.MinValue | |
override def onPush(elem: Int): Unit = { | |
maxValue = math.max(maxValue, elem) | |
if (maxValue > maxPushed) { | |
maxPushed = maxValue | |
emit(maxPushed) | |
} | |
} | |
override def materializedValue: NotUsed = NotUsed | |
override def outBufferSize: Int = 0 | |
} | |
import akka.stream.scaladsl.Source | |
import akka.stream.scaladsl.Sink | |
import akka.stream.scaladsl.Flow | |
import akka.stream.Materializer | |
implicit val mat: Materializer = ??? | |
val resultFuture = Source(1 to 5) | |
.via(LinearStage.flow(() => new Filter(_ % 2 == 0))) | |
.via(LinearStage.flow(() => new Duplicator())) | |
.via(LinearStage.flow(() => new Map(_ / 2))) | |
.runWith(Sink.ignore) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Looking good! I would like to try and remove
from the user facing API as well.
Also a default consctructor for
LinearStageLogic
that requires a stage name would be great to enforce adding a name to every custom stage.