Last active
October 16, 2017 12:19
-
-
Save francisdb/a31100c18129f1bab75efe87945afffa to your computer and use it in GitHub Desktop.
Akka streams utilities
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.NotUsed | |
import akka.stream.scaladsl.{Flow, Source} ` | |
import akka.stream.{Attributes, FlowShape, Inlet, Outlet} | |
import akka.stream.Attributes._ | |
import akka.stream.stage.{GraphStage, GraphStageLogic, InHandler, OutHandler} | |
import scala.collection.immutable.Seq | |
object AkkaStreams{ | |
/** | |
* Lifts a Flow to be able to be applied to a flow of Sequences | |
*/ | |
def liftToSeq[I, O](flow: Flow[I,O, _]): Flow[Seq[I], Seq[O], NotUsed] = | |
Flow[Seq[I]].flatMapConcat{ ss => | |
Source(ss).via(flow).grouped(Int.MaxValue) | |
} | |
val stringLength = Flow[String].map(_.length) | |
val stringLengthSeq = liftToSeq(stringLength) | |
} | |
/** | |
* Performs an optimized grouping by key expecting the incoming flow to be ordered by the key | |
* | |
* @param p the key extractor | |
*/ | |
final case class GroupAdjacent[T, A](p: T => A) extends GraphStage[FlowShape[T, Seq[T]]] { | |
private val in = Inlet[T]("GroupedBySequential.in") | |
private val out = Outlet[Seq[T]]("GroupedBySequential.out") | |
override val shape = FlowShape.of(in, out) | |
override protected val initialAttributes: Attributes = name("groupedBySequential") | |
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler { | |
private var lastKey: Option[A] = None | |
// TODO should we set a size hint on the builder? | |
private val buf = Vector.newBuilder[T] | |
override def onPush(): Unit = { | |
val currentValue = grab(in) | |
val key = p(currentValue) | |
if(lastKey.isEmpty){ | |
lastKey = Some(key) | |
} | |
if(lastKey.contains(key)){ | |
buf += currentValue | |
pull(in) | |
}else{ | |
lastKey = Some(key) | |
val elements = buf.result() | |
buf.clear() | |
buf += currentValue | |
push(out, elements) | |
} | |
} | |
override def onPull(): Unit = { | |
pull(in) | |
} | |
override def onUpstreamFinish(): Unit = { | |
// Since the upstream has finished we have to push the buffer downStream | |
val elements = buf.result() | |
if (elements.nonEmpty) { | |
buf.clear() | |
// push(out, elements) | |
// use emit as out might not yet be ready to receive | |
emit(out, elements) | |
} | |
completeStage() | |
} | |
setHandlers(in, out, this) | |
} | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment