Skip to content

Instantly share code, notes, and snippets.

@searler
Created June 4, 2015 21:44
Show Gist options
  • Save searler/aa41b118c4521c75f607 to your computer and use it in GitHub Desktop.
Save searler/aa41b118c4521c75f607 to your computer and use it in GitHub Desktop.
Apply scodec encoder to Akka reactive stream
package scodec
import scala.Vector
import scala.collection.immutable.Seq
import akka.actor.ActorSystem
import akka.stream.ActorFlowMaterializer
import akka.stream.scaladsl.Source
import akka.stream.stage.Context
import akka.stream.stage.PushStage
import akka.stream.stage.SyncDirective
import akka.util.ByteString
import scodec.Attempt.Successful
import scodec.codecs.uint
import scodec.codecs.variableSizeBytes
import scodec.codecs.vector
object Encoded extends App {
def parse[R](encoder: Encoder[R]) =
new PushStage[R, ByteString] {
def onPush(value: R, ctx: Context[ByteString]): SyncDirective = {
encoder.encode(value) match {
case Successful(bits) => ctx.push(ByteString(bits.bytes.toArray))
case e @ _ => ctx.fail(new RuntimeException(e.toString))
}
}
}
val encoder: Encoder[Vector[Int]] = variableSizeBytes(uint(8), vector(uint(8)))
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorFlowMaterializer()
Source.apply(Seq(Vector(1, 2), Vector(1, Integer.MAX_VALUE), Vector(1, 11)))
.transform(() => parse(encoder))
.runForeach { println }
.onComplete {
case scala.util.Failure(rte) =>
println(rte.getMessage); system.shutdown
case x @ _ => println(x); system.shutdown
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment