Much of this is out of date, but leaving it here for posterity. See the scodec-stream and scodec-bits projects, where all the details got worked out.
Current signature of Codec
in scodec is:
trait Codec[A] {
/** Attempts to encode the specified value in to a bit vector. */
def encode(a: A): String \/ BitVector
/** Attempts to decode a value of type `A` from the specified bit vector. */
def decode(bits: BitVector): String \/ (BitVector, A)
}
Looking at decode
first. (Minor note - consider having decode
just return an Error \/ (Int,A)
, the number of bytes consumed on success. It's unlikely that the decoder needs to insert bytes into the stream it is decoding, since that sort of thing could be expressed in other ways. This also makes it easier to implement streaming since the caller of decode
can easily detect how many bytes were read, otherwise we need to somehow infer this from the returned BitVector
.)
For streaming decoding, there are a few general approaches:
- The
Codec
can indicate up front how many bits it will need to read. The driver can the load up aBitVector
of exactly this. This is not realistic in general (any kind of context sensitivity will kill this). - The
decode
function can return a third state, 'not finished', due to insufficient input. So something like:A => Err \/ (Int, A) \/ Codec[A]
. Kind of weird that the continuation is aCodec
, so with this approach you'd want to split out a separateDecoder[A]
type. This can be kind of a pervasive change, though, and it can make writing decoders more complicated because you have to keep in mind the possibility that you might not have all required input. - The
decode
function can accept anInt => IO[BitVector]
or something similar, giving the decoder the ability to pull. The return type ofdecode
is then something likeIO[Error \/ (Int,A)]
. This is also kind of a big change. - Or give the decoder access to all the bits (lazily). This is lazy I/O, but if wrapped up in nice safe combinators, it will just be an implementation detail, so it might be fine.
I think this fourth option is probably the right one, because it requires minimal changes to the existing API and means that streaming could be done as a totally separate layer. The way this would work is we'd have a couple new implementations of BitVector
that lazily allow access to the entire stream:
NIOBitVector
is backed by ajava.nio.ByteBuffer
(which in turn could be created via memory-mapping a large file viaFileChannel.map
) and a range of indices into thisByteBuffer
. Thedrop
andtake
functions ofBitVector
do the obvious thing of adjusting the range of indices rather than actually copying the buffer.InputStreamBitVector
is backed by ajava.io.InputStream
whose results are 'locally' memoized to ensure referential transparency. Unless the decoder does serious backtracking or peeking (which are probably very limited in practice), I think this will 'just work'. I'll need to try it out, though.
When using these types, there is the problem that decoders could in theory retain the BitVector
after decoding completes (returning it as part of the output value). We could prevent this using ST
, but the overhead of this (both syntactically, computationally, and having to worry about SOEs) isn't worth it IMO. I just don't think this bug is very common, people would generally have to be doing something very deliberately wrong to retain and reuse a BitVector
after decoding completes, especially if they are assembling their decoders via combinators.
Once we have these new BitVector
implementations, we can now write combinators like:
/** Parses multiple `A` values, one after another, from the `InputStream`. */
def many[A:Codec](in: InputStream): Process[Task,A]
/** Parses multiple `A` values, separated by `D` ('delimiter') values, which are ignored. */
def sep[A:Codec,D:Codec](in: InputStream): Process[Task,A]
And whatever other parsing combinators are useful. The returned Process[Task,A]
can then be manipulated using all the usual scalaz-stream combinators. Internally, combinators like many
and sep
will use NIOBitVector
or InputStreamBitVector
which is allocated locally and then closed by the returned Process[Task,A]
on completion. So long as the Codec
for A
doesn't hold on to the BitVector
it gets and return it as part of the A
, this is safe.
The signature of encode
is problematic for streaming:
def encode(a: A): Error \/ BitVector
Because Error
is on the outside, we need to know up front whether an error occurred while encoding, which implies that the BitVector
must be produced strictly. I think this is actually okay though, we should just introduce streaming encoding as a separate layer. So, the individual 'elements' will be written strictly, but the top-level stream container will be lazy. We can assemble streaming encoders also via combinators:
package scodec.streaming
object Encode {
type Bitstream = Process[Task,BitVector]
/** Output each `A` one after another to the `Bitstream`. */
def many[A:Codec](in: Process[Task,A]): Bitstream
/** Output each `A` one after another, with `D` as the delimeter. */
def sep[A:Codec,D:Codec](delim: D)(in: Process[Task,A]): Bitstream
}
Etc. We can then manipulate the Bitstream
using the usual scalaz-stream combinators, send them to sinks, and so on. Also, the output of a streaming decode operation can be fed to these combinators, so we can express use cases like - do a streaming decode of packets from a 1GB memory-mapped file, manipulate these packets in some way, and dump them to some output file, working incrementally and in constant memory. Nice!