Last active
February 6, 2017 15:07
-
-
Save atamborrino/c402e1f4ac349bb9fa15 to your computer and use it in GitHub Desktop.
Conflate and/or expand a stream according to an user-defined functon (Akka Stream)
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Conflate and/or expand the stream with an user-defined function | |
* @param zero initial state | |
* @param f take current state and current elem, returns a seq of C elements to push downstream and the next state b | |
* if we want the stream to continue (if no new state b, the stream ends). | |
* @param lastPushIfUpstreamEnds if the upstream ends (before customStatefulProcessor decides to end the stream), | |
* this function is called on the last b state and the resulting c elements | |
* are pushed downstream as the last elements of the stream. | |
* @return | |
*/ | |
def customStatefulProcessor[A, B, C](zero: B) | |
(f: (B, A) => (Option[B], IndexedSeq[C]), | |
lastPushIfUpstreamEnds: B => IndexedSeq[C] = {_: B => IndexedSeq.empty}): Flow[A, C] = { | |
val stage = new PushPullStage[A, C] { | |
private var state = zero | |
private var buffer = Vector.empty[C] | |
private var finishing = false | |
override def onPush(elem: A, ctx: Context[C]): Directive = { | |
f(state, elem) match { | |
case (Some(b), cs) => | |
state = b | |
buffer ++= cs | |
emitChunkOrPull(ctx) | |
case (None, cs) => | |
buffer ++= cs | |
finishing = true | |
emitChunkOrPull(ctx) | |
} | |
} | |
override def onPull(ctx: Context[C]): Directive = emitChunkOrPull(ctx) | |
private def emitChunkOrPull(ctx: Context[C]): Directive = { | |
if (finishing) { // customProcessor is ending | |
buffer match { | |
case Seq() => ctx.finish() | |
case elem +: nextBuffer => | |
buffer = nextBuffer | |
ctx.push(elem) | |
} | |
} else if (ctx.isFinishing) { // upstream ended | |
buffer match { | |
case Seq() => | |
lastPushIfUpstreamEnds(state) match { | |
case Seq() => ctx.finish() | |
case elem +: nextBuffer => | |
finishing = true | |
buffer = nextBuffer.toVector | |
ctx.push(elem) | |
} | |
case elem +: nextBuffer => | |
buffer = nextBuffer | |
ctx.push(elem) | |
} | |
} else { | |
ctx.pull() | |
} | |
} | |
override def onUpstreamFinish(ctx: Context[C]): TerminationDirective = ctx.absorbTermination() | |
} | |
Flow[A].transform(() => stage) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment