This code allows the usage of an scodec Codec with the Apache Spark StreamingContext.socketStream.
Note how the head of bitvector is not retained, allowing GC of the consumed BitVector pieces.
This code allows the usage of an scodec Codec with the Apache Spark StreamingContext.socketStream.
Note how the head of bitvector is not retained, allowing GC of the consumed BitVector pieces.
object Iterating { | |
def apply[T]( d:Codec[T])(is: InputStream): Iterator[T] = { | |
new Iterator[T] { | |
var bv = BitVector.fromInputStream(is, 1000) | |
var current: T = _ | |
def parse: Unit = { | |
Decoder.decode(bv)(d) match { | |
case Attempt.Successful(DecodeResult(value, remainder)) => | |
current = value | |
bv = remainder | |
case Attempt.Failure(err) => | |
is.close | |
bv = null | |
} | |
} | |
def hasNext: Boolean = bv != null | |
def next: T = { | |
val c = current | |
parse | |
c | |
} | |
parse | |
} | |
} | |
} |