Skip to content

Instantly share code, notes, and snippets.

@searler
Created June 3, 2015 22:28
Show Gist options
  • Save searler/e58604ce3b73c8a044bc to your computer and use it in GitHub Desktop.
Save searler/e58604ce3b73c8a044bc to your computer and use it in GitHub Desktop.
Extracting length delimited ByteStrings from Akka reactive stream
object Framed extends App {
def parseFrames =
() => new StatefulStage[ByteString, ByteString] {
private var buffer = ByteString.empty
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[ByteString]): SyncDirective = {
buffer ++= chunk
emit(doFraming(Vector.empty).iterator, ctx)
}
@tailrec
private def doFraming(parsedSoFar: Vector[ByteString]): Vector[ByteString] =
buffer.headOption.map(_ & 0xFF) match {
case None => parsedSoFar
case Some(nextLength) if nextLength > buffer.size => parsedSoFar
case Some(nextLength) =>
val frame = buffer.take(nextLength)
buffer = buffer.drop(nextLength)
doFraming(parsedSoFar :+ frame)
}
}
}
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
Source.apply(() => Seq(ByteString(2, 3, 3, 4, 5)).iterator)
.transform(() => parseFrames())
.runForeach { println }
.onComplete { case _ => system.shutdown }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment