Skip to content

Instantly share code, notes, and snippets.

@mpilquist
Created June 24, 2019 15:55
Show Gist options
  • Save mpilquist/8bfbb7891718e1ce8b5687a157c65628 to your computer and use it in GitHub Desktop.
Save mpilquist/8bfbb7891718e1ce8b5687a157c65628 to your computer and use it in GitHub Desktop.
Sketch of StreamDecoder
package scodec.stream.decode
import language.higherKinds
import fs2._
import scodec.{ Attempt, Decoder, DecodeResult, Err }
import scodec.bits.BitVector
trait StreamDecoder3[+A] { self =>
import StreamDecoder3._
protected def step: StepResult[A]
def toPipe[F[_]: RaiseThrowable]: Pipe[F, BitVector, A] =
in => decode(in).void.stream
def toPipeByte[F[_]: RaiseThrowable]: Pipe[F, Byte, A] =
in => in.chunks.map(_.toBitVector).through(toPipe)
def decode[F[_]: RaiseThrowable](s: Stream[F, BitVector]): Pull[F, A, Option[Stream[F, BitVector]]] =
step match {
case Result(a) => Pull.output1(a).as(Some(s))
case Append(x, y) => x.decode(s).flatMap {
case None => Pull.pure(None)
case Some(rem) => y().decode(rem)
}
case Decode(decoder) =>
def loop(carry: BitVector, s: Stream[F, BitVector]): Pull[F, A, Option[Stream[F, BitVector]]] = {
s.pull.uncons1.flatMap {
case Some((hd, tl)) =>
val buffer = carry ++ hd
decoder(buffer) match {
case Attempt.Successful(DecodeResult(value, remainder)) =>
val next = if (remainder.isEmpty) tl else tl.cons1(remainder)
value.decode(next)
case Attempt.Failure(e: Err.InsufficientBits) =>
loop(buffer, tl)
case Attempt.Failure(e) =>
Pull.raiseError(DecodingError(e))
}
case None => if (carry.isEmpty) Pull.pure(None) else Pull.pure(Some(Stream(carry)))
}
}
loop(BitVector.empty, s)
}
def flatMap[B](f: A => StreamDecoder3[B]): StreamDecoder3[B] = new StreamDecoder3[B] {
def step: StepResult[B] = self.step match {
case Result(a) => f(a).step
case Decode(g) => Decode { in => g(in).map(_.map(_.flatMap(f))) }
case Append(x, y) => Append(x.flatMap(f), () => y().flatMap(f))
}
}
final def ++[A2>:A](d: => StreamDecoder3[A2]): StreamDecoder3[A2] = new StreamDecoder3[A2] {
def step: StepResult[A2] = Append(self, () => d)
}
final def repeat: StreamDecoder3[A] = this ++ repeat
final def map[B](f: A => B): StreamDecoder3[B] = flatMap(a => StreamDecoder3.pure(f(a)))
}
object StreamDecoder3 {
private[StreamDecoder3] sealed trait StepResult[+A]
private[StreamDecoder3] final case class Result[A](value: A) extends StepResult[A]
private[StreamDecoder3] final case class Decode[A](f: BitVector => Attempt[DecodeResult[StreamDecoder3[A]]]) extends StepResult[A]
private[StreamDecoder3] final case class Append[A](x: StreamDecoder3[A], y: () => StreamDecoder3[A]) extends StepResult[A]
def pure[A](a: A): StreamDecoder3[A] = new StreamDecoder3[A] {
val step = Result(a)
}
def decode[A](decoder: Decoder[A]): StreamDecoder3[A] = new StreamDecoder3[A] {
val step: StepResult[A] = Decode(in => decoder.decode(in).map(_.map(pure)))
}
def many[A](decoder: Decoder[A]): StreamDecoder3[A] = decode(decoder).repeat
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment