Last active
August 29, 2015 14:22
-
-
Save ajaychandran/a179c519b4c45753a334 to your computer and use it in GitHub Desktop.
scodec: FilterCodec
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scodec.codecs | |
import scodec._ | |
import scodec.bits.{ByteVector, BitVector} | |
/** | |
* ChecksumCodec | |
*/ | |
object ChecksumCodec { | |
/** | |
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range. | |
* | |
* @param encoder encodes a bit-range to a bit-checksum | |
* @param range decodes the size of a bit-range | |
* @return | |
*/ | |
def apply(encoder: Encoder[BitVector], range: Decoder[Long]): Codec[BitVector] = | |
Codec(encoder, Decoder( | |
bits => range.decode(bits).flatMap( | |
size => bits.consumeThen(size.value)( | |
e => Attempt.failure(Err.InsufficientBits(size.value, bits.size, List(e))), | |
(range, remainder) => Attempt.successful(DecodeResult(range, remainder)))))) | |
/** | |
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range. | |
* | |
* @param encoder encodes a bit-range to a bit-checksum | |
* @param range decodes the (un-padded) size of a bit-range | |
* @param padding size padding for the bit-range | |
* @return | |
*/ | |
def apply(encoder: Encoder[BitVector], range: Decoder[Long], padding: Long): Codec[BitVector] = | |
apply(encoder, range map (_ + padding)) | |
/** | |
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range. | |
* | |
* @param encoder encodes a byte-range to a byte-checksum | |
* @param range decodes the (un-padded) size of a byte-range | |
* @param padding size padding for the byte-range | |
* @return | |
*/ | |
def apply(encoder: Encoder[ByteVector], range: Decoder[Int], padding: Int): Codec[BitVector] = | |
apply(encoder.contramap[BitVector](_.bytes), range.map(8L * _), 8l * padding) | |
/** | |
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range. | |
* | |
* @param length the bit-length of the checksum | |
* @param f computes bit-checksum | |
* @param range decodes the (un-padded) size of a bit-range | |
* @param padding size padding for the bit-range | |
* @return | |
*/ | |
def apply(length: Long, f: BitVector => BitVector, range: Decoder[Long], padding: Long): Codec[BitVector] = | |
apply(new Encoder[BitVector] { | |
def encode(value: BitVector): Attempt[BitVector] = Attempt.successful(f(value)) | |
def sizeBound: SizeBound = SizeBound.exact(length) | |
}, range, padding) | |
/** | |
* Returns a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range. | |
* | |
* @param length the byte-length of the checksum | |
* @param f computes byte-checksum | |
* @param range decodes the (un-padded) size of a byte-range | |
* @param padding size padding for the byte-range | |
* @return | |
*/ | |
def apply(length: Int, f: ByteVector => ByteVector, range: Decoder[Int], padding: Int): Codec[BitVector] = | |
apply(8L * length, (bits: BitVector) => f(bits.bytes).bits, range.map(8L + _), 8L * padding) | |
/** | |
* Returns a codec that encodes a bit-range to an XORed bit-checksum and decodes bits to a bit-range. | |
* | |
* @param length the bit-length of the checksum | |
* @param range decodes the (un-padded) size of a bit-range | |
* @param padding size padding for the bit-range | |
* @return | |
*/ | |
def xor(length: Long, range: Decoder[Long], padding: Long): Codec[BitVector] = | |
apply(length, (bits: BitVector) => bits.grouped(length).foldLeft(BitVector.low(length))(_ xor _), range, padding) | |
/** | |
* Returns a codec that encodes a bit-range to an XORed bit-checksum and decodes bits to a bit-range. | |
* | |
* @param length the byte-length of the checksum | |
* @param range decodes the (un-padded) size of a byte-range | |
* @param padding size padding for the byte-range | |
* @return | |
*/ | |
def xor(length: Int, range: Decoder[Int], padding: Int): Codec[BitVector] = | |
xor(8L * length, range.map(8L * _), 8L * padding) | |
case class Mismatch(bits: BitVector, expected: BitVector, actual: BitVector, context: List[String] = Nil) extends Err { | |
def message: String = s"checksum mismatch for bits: $bits, expected: $expected, actual: $actual" | |
def pushContext(ctx: String): Err = copy(context = ctx :: context) | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package scodec.codecs | |
import scodec._ | |
import scodec.bits._ | |
/** | |
* ChecksumCodecTest | |
*/ | |
class ChecksumCodecTest extends CodecSuite { | |
"checksummed codec" should { | |
val codec = checksummed(ChecksumCodec.xor(1, int32, 4), variableSizeBytes(int32, utf8)) | |
"roundtrip" in { | |
forAll { (s: String) => roundtrip(codec, s) } | |
} | |
"roundtrip using combinators" in { | |
forAll { (n: Int, s: String) => roundtrip(int32 ~ codec, n ~ s) } | |
} | |
"append checksum on encode" in { | |
codec.encode("hello world").require should equal(hex"0x0000000b68656c6c6f20776f726c642b".bits) | |
} | |
"verify (and remove) checksum on decode" in { | |
codec.decode(hex"0x0000000b68656c6c6f20776f726c642b".bits).require.value should equal("hello world") | |
codec.decode(hex"0x0000000b68656c6c6f20776f726c642b".bits).require.remainder should equal(BitVector.empty) | |
} | |
"fail decoding on checksum mismatch" in { | |
codec.decode(hex"0x0000000b68656c6c6f20776f726c6400".bits) should equal(Attempt.failure(ChecksumCodec.Mismatch(hex"0x0000000b68656c6c6f20776f726c64".bits, hex"2b".bits, hex"00".bits))) | |
} | |
} | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/** | |
* Codec that filters bits before/after decoding/encoding. | |
* | |
* @param filter a codec that represents pre/post-processing stages for input/output bits | |
* @param codec the target codec | |
* @tparam A the result type | |
* @return | |
*/ | |
def filtered[A](filter: Codec[BitVector], codec: Codec[A]): Codec[A] = new Codec[A] { | |
def encode(value: A): Attempt[BitVector] = codec.encode(value) flatMap filter.encode | |
def sizeBound: SizeBound = filter.sizeBound | |
def decode(bits: BitVector): Attempt[DecodeResult[A]] = | |
filter.decode(bits) | |
.fold(e => Attempt.failure(e), r => codec.decode(r.value) | |
.fold(e => Attempt.failure(e), a => Attempt.successful(a.mapRemainder(_ ++ r.remainder)))) | |
} | |
/** | |
* Codec that filters a checksum. | |
* | |
* @param checksum a codec that encodes a bit-range to a bit-checksum and decodes bits to a bit-range | |
* @param codec the target codec | |
* @tparam A the result type | |
* @return | |
* @see [[ChecksumCodec]] | |
*/ | |
def checksummed[A](checksum: Codec[BitVector], codec: Codec[A]): Codec[A] = filtered(new Codec[BitVector] { | |
def encode(value: BitVector): Attempt[BitVector] = checksum.encode(value).map(value ++ _) | |
def sizeBound: SizeBound = checksum.sizeBound | |
def decode(bits: BitVector): Attempt[DecodeResult[BitVector]] = | |
checksum.decode(bits).flatMap( | |
r => checksum.encode(r.value).flatMap( | |
expected => r.remainder.consumeThen(expected.size)( | |
e => Attempt.failure(Err.InsufficientBits(expected.size, r.remainder.size, List(e))), | |
(actual, remainder) => | |
if (expected == actual) Attempt.successful(DecodeResult(r.value, remainder)) | |
else Attempt.failure(ChecksumCodec.Mismatch(r.value, expected, actual))))) | |
}, codec) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment