Skip to content

Instantly share code, notes, and snippets.

@atamborrino
Last active February 6, 2017 15:07
Show Gist options
  • Save atamborrino/c402e1f4ac349bb9fa15 to your computer and use it in GitHub Desktop.
Save atamborrino/c402e1f4ac349bb9fa15 to your computer and use it in GitHub Desktop.
Conflate and/or expand a stream according to an user-defined functon (Akka Stream)
/**
* Conflate and/or expand the stream with an user-defined function
* @param zero initial state
* @param f take current state and current elem, returns a seq of C elements to push downstream and the next state b
* if we want the stream to continue (if no new state b, the stream ends).
* @param lastPushIfUpstreamEnds if the upstream ends (before customStatefulProcessor decides to end the stream),
* this function is called on the last b state and the resulting c elements
* are pushed downstream as the last elements of the stream.
* @return
*/
def customStatefulProcessor[A, B, C](zero: B)
(f: (B, A) => (Option[B], IndexedSeq[C]),
lastPushIfUpstreamEnds: B => IndexedSeq[C] = {_: B => IndexedSeq.empty}): Flow[A, C] = {
val stage = new PushPullStage[A, C] {
private var state = zero
private var buffer = Vector.empty[C]
private var finishing = false
override def onPush(elem: A, ctx: Context[C]): Directive = {
f(state, elem) match {
case (Some(b), cs) =>
state = b
buffer ++= cs
emitChunkOrPull(ctx)
case (None, cs) =>
buffer ++= cs
finishing = true
emitChunkOrPull(ctx)
}
}
override def onPull(ctx: Context[C]): Directive = emitChunkOrPull(ctx)
private def emitChunkOrPull(ctx: Context[C]): Directive = {
if (finishing) { // customProcessor is ending
buffer match {
case Seq() => ctx.finish()
case elem +: nextBuffer =>
buffer = nextBuffer
ctx.push(elem)
}
} else if (ctx.isFinishing) { // upstream ended
buffer match {
case Seq() =>
lastPushIfUpstreamEnds(state) match {
case Seq() => ctx.finish()
case elem +: nextBuffer =>
finishing = true
buffer = nextBuffer.toVector
ctx.push(elem)
}
case elem +: nextBuffer =>
buffer = nextBuffer
ctx.push(elem)
}
} else {
ctx.pull()
}
}
override def onUpstreamFinish(ctx: Context[C]): TerminationDirective = ctx.absorbTermination()
}
Flow[A].transform(() => stage)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment