Created
November 26, 2009 11:03
-
-
Save kevinweil/243400 to your computer and use it in GitHub Desktop.
A Scala class for reading a file of protobufs, serialized as repeated blocks of byte length, byte blob pairs. Based on @al3x's Thrift version of the same at http://gracelessfailures.com/a-generic-thrift-deserializer-in-scala
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.google.protobuf.Message | |
import java.io.{BufferedReader, BufferedInputStream, DataInputStream, File, FileInputStream, InputStream, InputStreamReader} | |
import java.util.zip.GZIPInputStream | |
import scala.reflect.Manifest | |
class ProtobufFileScanner[M <: Message](implicit man: Manifest[M]) { | |
var file: File = null | |
var bufferedIn: BufferedInputStream = null | |
// ensure we're not at EOF | |
def streamHasNext: Boolean = { | |
bufferedIn.mark(1) | |
val nextByte = bufferedIn.read() | |
bufferedIn.reset() | |
nextByte != -1 | |
} | |
def allRecordsFromFile(fileName: String)(f: M => Unit) { | |
file = new File(fileName) | |
bufferedIn = new BufferedInputStream(new FileInputStream(file), 2048) | |
lazy val stream = if (fileName.endsWith(".gz")) { | |
new DataInputStream(new GZIPInputStream(bufferedIn, 2048)) | |
} else { | |
new DataInputStream(bufferedIn) | |
} | |
val record = man.erasure.newInstance.asInstanceOf[M] | |
while (streamHasNext) { | |
val arraySize = stream.readInt() | |
val bytes: Array[Byte] = new Array[Byte](arraySize) | |
stream.readFully(bytes) | |
record.parseFrom(bytes) | |
f(record) | |
} | |
stream.close() | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment