Skip to content

Instantly share code, notes, and snippets.

@searler
Created March 24, 2015 03:05
Show Gist options
  • Save searler/50942bf9232d9e67f1f9 to your computer and use it in GitHub Desktop.
Save searler/50942bf9232d9e67f1f9 to your computer and use it in GitHub Desktop.
Wrap a Codec in an Iterator that decodes InputStream

This code allows the usage of an scodec Codec with the Apache Spark StreamingContext.socketStream.

Note how the head of bitvector is not retained, allowing GC of the consumed BitVector pieces.

object Iterating {
def apply[T]( d:Codec[T])(is: InputStream): Iterator[T] = {
new Iterator[T] {
var bv = BitVector.fromInputStream(is, 1000)
var current: T = _
def parse: Unit = {
Decoder.decode(bv)(d) match {
case Attempt.Successful(DecodeResult(value, remainder)) =>
current = value
bv = remainder
case Attempt.Failure(err) =>
is.close
bv = null
}
}
def hasNext: Boolean = bv != null
def next: T = {
val c = current
parse
c
}
parse
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment