Last active
August 29, 2015 13:57
-
-
Save pchiusano/9769027 to your computer and use it in GitHub Desktop.
Break a binary input stream along frame boundaries using a pure `Process1`
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scodec.codecs | |
import scodec.bits.{BitVector,ByteVector} | |
import scalaz.stream.{Process1,Process} | |
object Unframing { | |
/** | |
* Break an input bytestream along frame boundaries. Input consists of a stream of frames, | |
* where each frame is just a number of bytes, encoded as an int32, followed by a packet of | |
* that many bytes. End of stream is indicated with a frame of size <= 0. Output stream is | |
* the stream of frame payloads. Input stream may be chunked at any granularity. | |
*/ | |
def unframe: Process1[ByteVector,ByteVector] = { | |
// parser can be in one of two states - waiting for 4 bytes to accumulate, to read | |
// the frame 'header' (just the size of the frame in bytes) | |
def frameHeader(acc: ByteVector): Process1[ByteVector,ByteVector] = | |
if (acc.size < 4) Process.await1[ByteVector].flatMap(bs => frameHeader(acc ++ bs)) | |
else codecs.int32.decode(acc.toBitVector).fold ( | |
errMsg => Process.fail(new IllegalArgumentException(errMsg)), | |
{ case (rem,size) => if (size <= 0) Process.halt else readFrame(size,rem) } | |
) | |
// or it can be in the state of accumulating a number of bytes specified by the | |
// frame header | |
def readFrame(bytesToRead: Int, bits: BitVector): Process1[ByteVector,ByteVector] = | |
if (bits.size / 8 >= bytesToRead) { | |
val bytes = bits.toByteVector | |
val frame = bytes.take(bytesToRead) | |
Process.emit(frame) fby frameHeader(bytes.drop(bytesToRead)) | |
} | |
else | |
Process.await1[ByteVector].flatMap { | |
bs => readFrame(bytesToRead, bits ++ BitVector(bs)) | |
} | |
frameHeader(ByteVector.empty) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment