Skip to content

Instantly share code, notes, and snippets.

@searler
Created June 3, 2015 22:29
Show Gist options
  • Save searler/2befa6c7567b7719d065 to your computer and use it in GitHub Desktop.
Save searler/2befa6c7567b7719d065 to your computer and use it in GitHub Desktop.
Apply scodec Decoder to an Akka reactive stream
import akka.stream.stage.StatefulStage
import akka.util.ByteString
import akka.stream.stage.Context
import akka.stream.stage.SyncDirective
import scala.annotation.tailrec
import akka.stream.ActorFlowMaterializer
import akka.actor.ActorSystem
import akka.stream.scaladsl.Source
import akka.stream.scaladsl.Sink
import scala.concurrent.Await
import scodec.codecs._
import scodec.bits._
import scodec.Attempt.Successful
object Decoded extends App {
def parse[R](decoder: Decoder[R]) =
new StatefulStage[ByteString, R] {
private var bitBuffer = BitVector.empty
def initial = new State {
override def onPush(chunk: ByteString, ctx: Context[R]): SyncDirective = {
chunk.asByteBuffers.foreach(bb => bitBuffer = bitBuffer ++ BitVector(bb))
emit(doParsing(Vector.empty).iterator, ctx)
}
@tailrec
private def doParsing(parsedSoFar: Vector[R]): Vector[R] =
decoder.decode(bitBuffer) match {
case s @ Successful(DecodeResult(value, remainder)) =>
bitBuffer = remainder
doParsing(parsedSoFar :+ value)
case _ => parsedSoFar
}
}
}
val decoder: Decoder[Vector[Int]] = variableSizeBytes(uint(8), vector(uint(8)))
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
Source.apply(() => Seq(ByteString(2), ByteString(6), ByteString(7), ByteString(1, 3, 2, 4, 5)).iterator)
.transform(() => parse(decoder))
.runForeach { println }
.onComplete { case _ => system.shutdown }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment