Skip to content

Instantly share code, notes, and snippets.

Last active October 16, 2017 12:19
Show Gist options
  • Save francisdb/a31100c18129f1bab75efe87945afffa to your computer and use it in GitHub Desktop.
Save francisdb/a31100c18129f1bab75efe87945afffa to your computer and use it in GitHub Desktop.
Akka streams utilities
import akka.NotUsed
import{Flow, Source} `
import{Attributes, FlowShape, Inlet, Outlet}
import{GraphStage, GraphStageLogic, InHandler, OutHandler}
import scala.collection.immutable.Seq
object AkkaStreams{
* Lifts a Flow to be able to be applied to a flow of Sequences
def liftToSeq[I, O](flow: Flow[I,O, _]): Flow[Seq[I], Seq[O], NotUsed] =
Flow[Seq[I]].flatMapConcat{ ss =>
val stringLength = Flow[String].map(_.length)
val stringLengthSeq = liftToSeq(stringLength)
* Performs an optimized grouping by key expecting the incoming flow to be ordered by the key
* @param p the key extractor
final case class GroupAdjacent[T, A](p: T => A) extends GraphStage[FlowShape[T, Seq[T]]] {
private val in = Inlet[T]("")
private val out = Outlet[Seq[T]]("GroupedBySequential.out")
override val shape = FlowShape.of(in, out)
override protected val initialAttributes: Attributes = name("groupedBySequential")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) with InHandler with OutHandler {
private var lastKey: Option[A] = None
// TODO should we set a size hint on the builder?
private val buf = Vector.newBuilder[T]
override def onPush(): Unit = {
val currentValue = grab(in)
val key = p(currentValue)
lastKey = Some(key)
buf += currentValue
lastKey = Some(key)
val elements = buf.result()
buf += currentValue
push(out, elements)
override def onPull(): Unit = {
override def onUpstreamFinish(): Unit = {
// Since the upstream has finished we have to push the buffer downStream
val elements = buf.result()
if (elements.nonEmpty) {
// push(out, elements)
// use emit as out might not yet be ready to receive
emit(out, elements)
setHandlers(in, out, this)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment